[VOL-2688] Improve core model performance

This commit addresses the low-hanging performance hogs in the
core model.  In particular, the following changes are made:

1) Remove proto message comparision when it's possible.  The proto
message deep comparison is quite expensive.
2) Since the Core already has a lock on the device/logicaldevice/
adapters/etc before invoking the model proxy then there is no
need for the latter to create an additional lock on these artifacts
duting an update
3) The model creates a watch on every artifacts it adds to the KV
store.   Since in the next Voltha release we will not be using Voltha
Core in pairs then there is no point in keeping these watches (these
is only 1 Core that will ever update an artifact in the next
deployment).  This update removes these watch.
4) Additional unit tests has been created, mostly around flows, in an
attempt to exercise both the core and the model further.

Change-Id: Ieaf1f6b9b05c56e819600bc55b46a05f73b8efcf
diff --git a/VERSION b/VERSION
index 0bee604..ca2d639 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.3
+2.3.3-dev
diff --git a/db/model/node.go b/db/model/node.go
index bec07a5..152bf29 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -598,29 +598,23 @@
 		return nil
 	}
 
-	// TODO: validate that this actually works
-	//if n.hasChildren(data) {
-	//	return nil
-	//}
-
 	if n.GetProxy() != nil {
 		log.Debug("invoking proxy PreUpdate Callbacks")
 		n.GetProxy().InvokeCallbacks(ctx, PreUpdate, false, branch.GetLatest(), data)
 	}
 
-	if branch.GetLatest().GetData().(proto.Message).String() != data.(proto.Message).String() {
-		if strict {
-			// TODO: checkAccessViolations(data, Branch.GetLatest.data)
-			log.Debugf("checking access violations")
-		}
-
-		rev := branch.GetLatest().UpdateData(ctx, data, branch)
-		changes := []ChangeTuple{{PostUpdate, branch.GetLatest().GetData(), rev.GetData()}}
-		n.makeLatest(branch, rev, changes)
-
-		return rev
+	if strict {
+		// TODO: checkAccessViolations(data, Branch.GetLatest.data)
+		log.Warn("access-violations-not-supported")
 	}
-	return branch.GetLatest()
+
+	// The way the model is used, this function is only invoked upon data change.  Therefore, to also
+	// avoid a deep proto.message comparison (expensive), just create a new branch regardless
+	rev := branch.GetLatest().UpdateData(ctx, data, branch)
+	changes := []ChangeTuple{{PostUpdate, branch.GetLatest().GetData(), rev.GetData()}}
+	n.makeLatest(branch, rev, changes)
+
+	return rev
 }
 
 // Add inserts a new node at the specified path with the provided data
@@ -691,7 +685,6 @@
 
 				updatedRev := rev.UpdateChildren(ctx, name, children, branch)
 				changes := []ChangeTuple{{PostAdd, nil, childRev.GetData()}}
-				childRev.SetupWatch(ctx, childRev.GetName())
 
 				n.makeLatest(branch, updatedRev, changes)
 
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 347be0d..20aeec3 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -384,29 +384,10 @@
 
 			// Does the existing list contain a child with that name?
 			if nameExists {
-				// Check if the data has changed or not
-				if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
-					log.Debugw("replacing-existing-child", log.Fields{
-						"old-hash": existingChildren[nameIndex].GetHash(),
-						"old-data": existingChildren[nameIndex].GetData(),
-						"new-hash": newChild.GetHash(),
-						"new-data": newChild.GetData(),
-					})
-
-					// replace entry
-					newChild.getNode().SetRoot(existingChildren[nameIndex].getNode().GetRoot())
-					updatedChildren = append(updatedChildren, newChild)
-				} else {
-					log.Debugw("keeping-existing-child", log.Fields{
-						"old-hash": existingChildren[nameIndex].GetHash(),
-						"old-data": existingChildren[nameIndex].GetData(),
-						"new-hash": newChild.GetHash(),
-						"new-data": newChild.GetData(),
-					})
-
-					// keep existing entry
-					updatedChildren = append(updatedChildren, existingChildren[nameIndex])
-				}
+				// This function is invoked only when the data has actually changed (current Core usage).  Therefore,
+				// we need to avoid an expensive deep proto.message comparison and treat the data as an update
+				newChild.getNode().SetRoot(existingChildren[nameIndex].getNode().GetRoot())
+				updatedChildren = append(updatedChildren, newChild)
 			} else {
 				log.Debugw("adding-unknown-child", log.Fields{
 					"hash": newChild.GetHash(),
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 3637e9a..15e438c 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -25,7 +25,6 @@
 	"sync"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/google/uuid"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -42,21 +41,6 @@
 	versionMutex sync.RWMutex
 	Version      int64
 	isStored     bool
-	isWatched    bool
-}
-
-type watchCache struct {
-	Cache sync.Map
-}
-
-var watchCacheInstance *watchCache
-var watchCacheOne sync.Once
-
-func watches() *watchCache {
-	watchCacheOne.Do(func() {
-		watchCacheInstance = &watchCache{Cache: sync.Map{}}
-	})
-	return watchCacheInstance
 }
 
 // NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -119,154 +103,6 @@
 	}
 }
 
-// SetupWatch -
-func (pr *PersistedRevision) SetupWatch(ctx context.Context, key string) {
-	if key == "" {
-		log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
-		return
-	}
-
-	if _, exists := watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
-		return
-	}
-
-	if pr.events == nil {
-		pr.events = make(chan *kvstore.Event)
-
-		log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
-
-		pr.SetName(key)
-		pr.events = pr.kvStore.CreateWatch(ctx, key, false)
-	}
-
-	if !pr.isWatched {
-		pr.isWatched = true
-
-		log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
-
-		// Start watching
-		go pr.startWatching(ctx)
-	}
-}
-
-func (pr *PersistedRevision) startWatching(ctx context.Context) {
-	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-
-StopWatchLoop:
-	for {
-		latestRev := pr.GetBranch().GetLatest()
-		event, ok := <-pr.events
-		if !ok {
-			log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-			break StopWatchLoop
-		}
-		log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
-
-		switch event.EventType {
-		case kvstore.DELETE:
-			log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-
-			// Remove reference from cache
-			getRevCache().Delete(latestRev.GetName())
-
-			// Remove reference from parent
-			parent := pr.GetBranch().Node.GetRoot()
-			parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
-
-			break StopWatchLoop
-
-		case kvstore.PUT:
-			log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-			if latestRev.getVersion() >= event.Version {
-				log.Debugw("skipping-matching-or-older-revision", log.Fields{
-					"watch":          latestRev.GetName(),
-					"watch-version":  event.Version,
-					"latest-version": latestRev.getVersion(),
-				})
-				continue
-			} else {
-				log.Debugw("watch-revision-is-newer", log.Fields{
-					"watch":          latestRev.GetName(),
-					"watch-version":  event.Version,
-					"latest-version": latestRev.getVersion(),
-				})
-			}
-
-			data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
-
-			if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
-				log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
-			} else {
-				log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
-
-				var pathLock string
-
-				// The watch reported new persistence data.
-				// Construct an object that will be used to update the memory
-				blobs := make(map[string]*kvstore.KVPair)
-				key, _ := kvstore.ToString(event.Key)
-				blobs[key] = &kvstore.KVPair{
-					Key:     key,
-					Value:   event.Value,
-					Session: "",
-					Lease:   0,
-					Version: event.Version,
-				}
-
-				if latestRev.getNode().GetProxy() != nil {
-					//
-					// If a proxy exists for this revision, use it to lock access to the path
-					// and prevent simultaneous updates to the object in memory
-					//
-
-					//If the proxy already has a request in progress, then there is no need to process the watch
-					if latestRev.getNode().GetProxy().GetOperation() != ProxyNone {
-						log.Debugw("operation-in-progress", log.Fields{
-							"key":       latestRev.GetHash(),
-							"path":      latestRev.getNode().GetProxy().getFullPath(),
-							"operation": latestRev.getNode().GetProxy().operation.String(),
-						})
-						continue
-					}
-
-					pathLock, _ = latestRev.getNode().GetProxy().parseForControlledPath(latestRev.getNode().GetProxy().getFullPath())
-
-					// Reserve the path to prevent others to modify while we reload from persistence
-					if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-						log.Errorw("Unable to acquire a key and set it to a given value", log.Fields{"error": err})
-					}
-					latestRev.getNode().GetProxy().SetOperation(ProxyWatch)
-
-					// Load changes and apply to memory
-					if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
-						log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
-					}
-
-					// Release path
-					if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_"); err != nil {
-						log.Errorw("Unable to release reservation for a specific key", log.Fields{"error": err})
-					}
-				} else {
-					// This block should be reached only if coming from a non-proxied request
-					log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-
-					// Load changes and apply to memory
-					if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
-						log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
-					}
-				}
-			}
-
-		default:
-			log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
-		}
-	}
-
-	watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
-
-	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-}
-
 // UpdateData modifies the information in the data model and saves it in the persistent storage
 func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
 	log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
@@ -274,12 +110,11 @@
 	newNPR := pr.Revision.UpdateData(ctx, data, branch)
 
 	newPR := &PersistedRevision{
-		Revision:  newNPR,
-		Compress:  pr.Compress,
-		kvStore:   pr.kvStore,
-		events:    pr.events,
-		Version:   pr.getVersion(),
-		isWatched: pr.isWatched,
+		Revision: newNPR,
+		Compress: pr.Compress,
+		kvStore:  pr.kvStore,
+		events:   pr.events,
+		Version:  pr.getVersion(),
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
@@ -300,12 +135,11 @@
 	newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
 
 	newPR := &PersistedRevision{
-		Revision:  newNPR,
-		Compress:  pr.Compress,
-		kvStore:   pr.kvStore,
-		events:    pr.events,
-		Version:   pr.getVersion(),
-		isWatched: pr.isWatched,
+		Revision: newNPR,
+		Compress: pr.Compress,
+		kvStore:  pr.kvStore,
+		events:   pr.events,
+		Version:  pr.getVersion(),
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
@@ -325,12 +159,11 @@
 	newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
 
 	newPR := &PersistedRevision{
-		Revision:  newNPR,
-		Compress:  pr.Compress,
-		kvStore:   pr.kvStore,
-		events:    pr.events,
-		Version:   pr.getVersion(),
-		isWatched: pr.isWatched,
+		Revision: newNPR,
+		Compress: pr.Compress,
+		kvStore:  pr.kvStore,
+		events:   pr.events,
+		Version:  pr.getVersion(),
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
@@ -357,11 +190,6 @@
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 	if pr.kvStore != nil && txid == "" {
-		if pr.isWatched {
-			pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
-			pr.isWatched = false
-		}
-
 		if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
 			log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
 		} else {
@@ -412,7 +240,6 @@
 			updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
 
 			updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
-			updatedChildRev.SetupWatch(ctx, updatedChildRev.GetName())
 			updatedChildRev.SetLastUpdate()
 			updatedChildRev.(*PersistedRevision).setVersion(version)
 
@@ -482,7 +309,6 @@
 
 		// We need to start watching this entry for future changes
 		childRev.SetName(typeName + "/" + keyValue)
-		childRev.SetupWatch(ctx, childRev.GetName())
 		childRev.(*PersistedRevision).setVersion(version)
 
 		// Add entry to cache
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 3ffc9ff..303bc4e 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -26,7 +26,6 @@
 	"strings"
 	"sync"
 
-	"github.com/google/uuid"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
@@ -191,29 +190,6 @@
 	p.operation = operation
 }
 
-// parseForControlledPath verifies if a proxy path matches a pattern
-// for locations that need to be access controlled.
-func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
-	// TODO: Add other path prefixes that may need control
-	if strings.HasPrefix(path, "/devices") ||
-		strings.HasPrefix(path, "/logical_devices") ||
-		strings.HasPrefix(path, "/adapters") {
-
-		split := strings.SplitN(path, "/", -1)
-		switch len(split) {
-		case 2:
-			controlled = false
-			pathLock = ""
-		case 3:
-			fallthrough
-		default:
-			pathLock = fmt.Sprintf("%s/%s", split[1], split[2])
-			controlled = true
-		}
-	}
-	return pathLock, controlled
-}
-
 // List will retrieve information from the data model at the specified path location
 // A list operation will force access to persistence storage
 func (p *Proxy) List(ctx context.Context, path string, depth int, deep bool, txid string) (interface{}, error) {
@@ -224,17 +200,13 @@
 		effectivePath = p.getFullPath() + path
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyList)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-list", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"operation": p.GetOperation(),
 	})
 	return p.getRoot().List(ctx, path, "", depth, deep, txid)
 }
@@ -248,17 +220,13 @@
 		effectivePath = p.getFullPath() + path
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyGet)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-get", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"operation": p.GetOperation(),
 	})
 
 	return p.getRoot().Get(ctx, path, "", depth, deep, txid)
@@ -280,33 +248,16 @@
 		effectivePath = p.getFullPath() + path
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyUpdate)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-update", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"full":       fullPath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"full":      fullPath,
+		"operation": p.GetOperation(),
 	})
 
-	if p.getRoot().KvStore != nil {
-		if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-			log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
-			return nil, err
-		}
-		defer func() {
-			err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
-			if err != nil {
-				log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
-			}
-		}()
-	}
-
 	result := p.getRoot().Update(ctx, fullPath, data, strict, txid, nil)
 
 	if result != nil {
@@ -334,33 +285,16 @@
 		effectivePath = p.getFullPath() + path + "/" + id
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyAdd)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-add-with-id", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"full":       fullPath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"full":      fullPath,
+		"operation": p.GetOperation(),
 	})
 
-	if p.getRoot().KvStore != nil {
-		if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-			log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
-			return nil, err
-		}
-		defer func() {
-			err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
-			if err != nil {
-				log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
-			}
-		}()
-	}
-
 	result := p.getRoot().Add(ctx, fullPath, data, txid, nil)
 
 	if result != nil {
@@ -386,33 +320,16 @@
 		effectivePath = p.getFullPath() + path
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyAdd)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-add", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"full":       fullPath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"full":      fullPath,
+		"operation": p.GetOperation(),
 	})
 
-	if p.getRoot().KvStore != nil {
-		if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-			log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
-			return nil, err
-		}
-		defer func() {
-			err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
-			if err != nil {
-				log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
-			}
-		}()
-	}
-
 	result := p.getRoot().Add(ctx, fullPath, data, txid, nil)
 
 	if result != nil {
@@ -438,33 +355,16 @@
 		effectivePath = p.getFullPath() + path
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyRemove)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-remove", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"full":       fullPath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"full":      fullPath,
+		"operation": p.GetOperation(),
 	})
 
-	if p.getRoot().KvStore != nil {
-		if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-			log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
-			return nil, err
-		}
-		defer func() {
-			err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
-			if err != nil {
-				log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
-			}
-		}()
-	}
-
 	result := p.getRoot().Remove(ctx, fullPath, txid, nil)
 
 	if result != nil {
@@ -491,32 +391,16 @@
 		effectivePath = p.getFullPath() + path
 	}
 
-	pathLock, controlled := p.parseForControlledPath(effectivePath)
-
 	p.SetOperation(ProxyCreate)
 	defer p.SetOperation(ProxyNone)
 
 	log.Debugw("proxy-create", log.Fields{
-		"path":       path,
-		"effective":  effectivePath,
-		"full":       fullPath,
-		"pathLock":   pathLock,
-		"controlled": controlled,
-		"operation":  p.GetOperation(),
+		"path":      path,
+		"effective": effectivePath,
+		"full":      fullPath,
+		"operation": p.GetOperation(),
 	})
 
-	if p.getRoot().KvStore != nil {
-		if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-			log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
-			return nil, err
-		}
-		defer func() {
-			err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
-			if err != nil {
-				log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
-			}
-		}()
-	}
 	return p.getRoot().CreateProxy(ctx, fullPath, exclusive)
 }
 
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 346198d..d07a095 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -57,6 +57,7 @@
 type isDeviceConditionSatisfied func(ld *voltha.Device) bool
 type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
 type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
+type isConditionSatisfied func() bool
 
 func init() {
 	_, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
@@ -193,7 +194,7 @@
 			d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
 			if d != nil && d.ParentId != "" {
 				ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
-				if ld != nil && verificationFunction(ld) {
+				if verificationFunction(ld) {
 					ch <- 1
 					break
 				}
@@ -270,3 +271,29 @@
 		return fmt.Errorf("timeout-waiting-logical-devices")
 	}
 }
+
+func waitUntilCondition(timeout time.Duration, nbi *APIHandler, verificationFunction isConditionSatisfied) error {
+	ch := make(chan int, 1)
+	done := false
+	go func() {
+		for {
+			if verificationFunction() {
+				ch <- 1
+				break
+			}
+			if done {
+				break
+			}
+			time.Sleep(retryInterval)
+		}
+	}()
+	timer := time.NewTimer(timeout)
+	defer timer.Stop()
+	select {
+	case <-ch:
+		return nil
+	case <-timer.C:
+		done = true
+		return fmt.Errorf("timeout-waiting-for-condition")
+	}
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 877d991..42d628a 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -1457,7 +1457,8 @@
 
 	//Remove the associated peer ports on the parent device
 	if err := agent.deviceMgr.deletePeerPorts(ctx, device.ParentId, device.Id); err != nil {
-		return err
+		// At this stage, the parent device may also have been deleted.  Just log and keep processing.
+		log.Warnw("failure-deleting-peer-port", log.Fields{"error": err, "child-device-id": device.Id, "parent-device-id": device.ParentId})
 	}
 
 	if err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId); err != nil {
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 1abc565..6122b51 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -19,7 +19,13 @@
 	"context"
 	"errors"
 	"fmt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"math/rand"
+	"os"
+	"runtime"
+	"runtime/pprof"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -39,16 +45,19 @@
 )
 
 type NBTest struct {
-	etcdServer     *lm.EtcdServer
-	core           *Core
-	kClient        kafka.Client
-	kvClientPort   int
-	numONUPerOLT   int
-	oltAdapterName string
-	onuAdapterName string
-	coreInstanceID string
-	defaultTimeout time.Duration
-	maxTimeout     time.Duration
+	etcdServer        *lm.EtcdServer
+	core              *Core
+	kClient           kafka.Client
+	kvClientPort      int
+	numONUPerOLT      int
+	startingUNIPortNo int
+	oltAdapter        *cm.OLTAdapter
+	onuAdapter        *cm.ONUAdapter
+	oltAdapterName    string
+	onuAdapterName    string
+	coreInstanceID    string
+	defaultTimeout    time.Duration
+	maxTimeout        time.Duration
 }
 
 func newNBTest() *NBTest {
@@ -64,8 +73,8 @@
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
-	test.defaultTimeout = 10 * time.Second
-	test.maxTimeout = 20 * time.Second
+	test.defaultTimeout = 50 * time.Second
+	test.maxTimeout = 300 * time.Second
 	return test
 }
 
@@ -99,9 +108,10 @@
 	if err != nil {
 		log.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
 	}
-	if adapter, ok := (oltAdapter).(*cm.OLTAdapter); ok {
-		nb.numONUPerOLT = adapter.GetNumONUPerOLT()
-	}
+	nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
 	//	Register the adapter
 	registrationData := &voltha.Adapter{
 		Id:      nb.oltAdapterName,
@@ -116,9 +126,12 @@
 	}
 
 	// Setup the mock ONU adapter
-	if _, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName); err != nil {
+	onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+	if err != nil {
 		log.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
 	}
+	nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
+
 	//	Register the adapter
 	registrationData = &voltha.Adapter{
 		Id:      nb.onuAdapterName,
@@ -191,59 +204,64 @@
 	assert.Nil(t, err)
 	assert.NotNil(t, devices)
 
-	// Wait until devices are in the correct states
+	// A device is ready to be examined when its ADMIN state is ENABLED and OPERATIONAL state is ACTIVE
 	var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
 		return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
 	}
-	for _, d := range devices.Items {
-		err = waitUntilDeviceReadiness(d.Id, nb.maxTimeout, vFunction, nbi)
-		assert.Nil(t, err)
-		assert.NotNil(t, d)
-	}
-	// Get the latest device updates as they may have changed since last list devices
-	updatedDevices, err := nbi.ListDevices(getContext(), &empty.Empty{})
-	assert.Nil(t, err)
-	assert.NotNil(t, devices)
-	for _, d := range updatedDevices.Items {
-		assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
-		assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
-		assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
-		assert.Equal(t, d.Type, d.Adapter)
-		assert.NotEqual(t, "", d.MacAddress)
-		assert.NotEqual(t, "", d.SerialNumber)
 
-		if d.Type == "olt_adapter_mock" {
-			assert.Equal(t, true, d.Root)
-			assert.NotEqual(t, "", d.Id)
-			assert.NotEqual(t, "", d.ParentId)
-			assert.Nil(t, d.ProxyAddress)
-		} else if d.Type == "onu_adapter_mock" {
-			assert.Equal(t, false, d.Root)
-			assert.NotEqual(t, uint32(0), d.Vlan)
-			assert.NotEqual(t, "", d.Id)
-			assert.NotEqual(t, "", d.ParentId)
-			assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
-			assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
-		} else {
-			assert.Error(t, errors.New("invalid-device-type"))
-		}
-		assert.Equal(t, 2, len(d.Ports))
-		for _, p := range d.Ports {
-			assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
-			assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
-			if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
-				assert.Equal(t, 0, len(p.Peers))
-			} else if p.Type == voltha.Port_PON_OLT {
-				assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
-				assert.Equal(t, uint32(1), p.PortNo)
-			} else if p.Type == voltha.Port_PON_ONU {
-				assert.Equal(t, 1, len(p.Peers))
-				assert.Equal(t, uint32(1), p.PortNo)
+	var wg sync.WaitGroup
+	for _, device := range devices.Items {
+		wg.Add(1)
+		go func(wg *sync.WaitGroup, device *voltha.Device) {
+			// Wait until the device is in the right state
+			err := waitUntilDeviceReadiness(device.Id, nb.maxTimeout, vFunction, nbi)
+			assert.Nil(t, err)
+
+			// Now, verify the details of the device.  First get the latest update
+			d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
+			assert.Nil(t, err)
+			assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
+			assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
+			assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
+			assert.Equal(t, d.Type, d.Adapter)
+			assert.NotEqual(t, "", d.MacAddress)
+			assert.NotEqual(t, "", d.SerialNumber)
+
+			if d.Type == "olt_adapter_mock" {
+				assert.Equal(t, true, d.Root)
+				assert.NotEqual(t, "", d.Id)
+				assert.NotEqual(t, "", d.ParentId)
+				assert.Nil(t, d.ProxyAddress)
+			} else if d.Type == "onu_adapter_mock" {
+				assert.Equal(t, false, d.Root)
+				assert.NotEqual(t, uint32(0), d.Vlan)
+				assert.NotEqual(t, "", d.Id)
+				assert.NotEqual(t, "", d.ParentId)
+				assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
+				assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
 			} else {
-				assert.Error(t, errors.New("invalid-port"))
+				assert.Error(t, errors.New("invalid-device-type"))
 			}
-		}
+			assert.Equal(t, 2, len(d.Ports))
+			for _, p := range d.Ports {
+				assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
+				assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
+				if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+					assert.Equal(t, 0, len(p.Peers))
+				} else if p.Type == voltha.Port_PON_OLT {
+					assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
+					assert.Equal(t, uint32(1), p.PortNo)
+				} else if p.Type == voltha.Port_PON_ONU {
+					assert.Equal(t, 1, len(p.Peers))
+					assert.Equal(t, uint32(1), p.PortNo)
+				} else {
+					assert.Error(t, errors.New("invalid-port"))
+				}
+			}
+			wg.Done()
+		}(&wg, device)
 	}
+	wg.Wait()
 }
 
 func (nb *NBTest) getADevice(rootDevice bool, nbi *APIHandler) (*voltha.Device, error) {
@@ -370,6 +388,11 @@
 	err = waitUntilConditionForDevices(5*time.Second, nbi, vdFunction)
 	assert.Nil(t, err)
 
+	// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg)
+
 	//	Create the device with valid data
 	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
 	assert.Nil(t, err)
@@ -401,6 +424,9 @@
 
 	// Verify that the logical device has been setup correctly
 	nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+	// Wait until all flows has been sent to the devices successfully
+	wg.Wait()
 }
 
 func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *APIHandler) {
@@ -430,6 +456,9 @@
 
 	// Wait for the logical device to satisfy the expected condition
 	var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
@@ -462,6 +491,9 @@
 
 	// Wait for the logical device to satisfy the expected condition
 	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -617,6 +649,9 @@
 	assert.Nil(t, err)
 	// Wait for the logical device to satisfy the expected condition
 	var vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -645,6 +680,9 @@
 	assert.Nil(t, err)
 	// Wait for the logical device to satisfy the expected condition
 	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -672,7 +710,226 @@
 
 }
 
+func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
+	matchFields := make([]*ofp.OfpOxmField, 0)
+	for _, val := range fa.MatchFields {
+		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+	}
+	return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func createMetadata(cTag int, techProfile int, port int) uint64 {
+	md := 0
+	md = (md | (cTag & 0xFFFF)) << 16
+	md = (md | (techProfile & 0xFFFF)) << 32
+	return uint64(md | (port & 0xFFFFFFFF))
+}
+
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int) {
+	expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
+	// Wait for logical device to have all the flows
+	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+		return lds != nil && len(lds.Items) == 1 && len(lds.Items[0].Flows.Items) == expectedNumFlows
+	}
+	// No timeout implies a success
+	err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+}
+
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *APIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+	// Send flows for the parent device
+	var nniPorts []*voltha.LogicalPort
+	var uniPorts []*voltha.LogicalPort
+	for _, p := range logicalDevice.Ports {
+		if p.RootPort {
+			nniPorts = append(nniPorts, p)
+		} else {
+			uniPorts = append(uniPorts, p)
+		}
+	}
+	assert.Equal(t, 1, len(nniPorts))
+	//assert.Greater(t, len(uniPorts), 1 )
+	nniPort := nniPorts[0].OfpPort.PortNo
+	maxInt32 := uint64(0xFFFFFFFF)
+	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+	var fa *flows.FlowArgs
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(35020),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
+	assert.Nil(t, err)
+
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(2048),
+			flows.IpProto(17),
+			flows.UdpSrc(67),
+			flows.UdpDst(68),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
+	assert.Nil(t, err)
+
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(34525),
+			flows.IpProto(17),
+			flows.UdpSrc(546),
+			flows.UdpDst(547),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
+	assert.Nil(t, err)
+
+	return len(nniPorts), len(uniPorts)
+}
+
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *APIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+	maxInt32 := uint64(0xFFFFFFFF)
+	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(port.PortNo),
+			flows.EthType(34958),
+			flows.VlanVid(8187),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+	_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+	assert.Nil(t, err)
+}
+
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+	defer wg.Done()
+	if nb.core.logicalDeviceMgr.grpcNbiHdlr != nbi {
+		nb.core.logicalDeviceMgr.setGrpcNbiHandler(nbi)
+	}
+
+	// Clear any existing flows on the adapters
+	nb.oltAdapter.ClearFlows()
+	nb.onuAdapter.ClearFlows()
+
+	// Wait until a logical device is ready
+	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+		if lds == nil || len(lds.Items) != 1 {
+			return false
+		}
+		// Ensure there are both NNI ports and at least one UNI port on the logical device
+		ld := lds.Items[0]
+		nniPort := false
+		uniPort := false
+		for _, p := range ld.Ports {
+			nniPort = nniPort || p.RootPort == true
+			uniPort = uniPort || p.RootPort == false
+			if nniPort && uniPort {
+				return true
+			}
+		}
+		return false
+	}
+	err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+
+	logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.NotNil(t, logicalDevices)
+	assert.Equal(t, 1, len(logicalDevices.Items))
+
+	logicalDevice := logicalDevices.Items[0]
+	meterID := rand.Uint32()
+
+	// Add a meter to the logical device
+	meterMod := &ofp.OfpMeterMod{
+		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+		Flags:   rand.Uint32(),
+		MeterId: meterID,
+		Bands: []*ofp.OfpMeterBandHeader{
+			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+				Rate:      rand.Uint32(),
+				BurstSize: rand.Uint32(),
+				Data:      nil,
+			},
+		},
+	}
+	_, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDevice.Id, MeterMod: meterMod})
+	assert.Nil(t, err)
+
+	// Send initial set of Trap flows
+	startingVlan := 4091
+	nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
+
+	// Listen for port events
+	processedLogicalPorts := 0
+	for event := range nbi.changeEventQueue {
+		startingVlan++
+		if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
+			ps := portStatus.PortStatus
+			if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
+				processedLogicalPorts++
+				if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+					nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+				}
+			}
+		}
+		if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+			break
+		}
+	}
+	//Verify the flow count on the logical device
+	nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts)
+
+	// Wait until all flows have been sent to the OLT adapters
+	var oltVFunc isConditionSatisfied = func() bool {
+		return nb.oltAdapter.GetFlowCount() >= (numNNIPorts*3)+numNNIPorts*numUNIPorts
+	}
+	err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
+	assert.Nil(t, err)
+
+	// Wait until all flows have been sent to the ONU adapters
+	var onuVFunc isConditionSatisfied = func() bool {
+		return nb.onuAdapter.GetFlowCount() == numUNIPorts
+	}
+	err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
+	assert.Nil(t, err)
+}
+
 func TestSuite1(t *testing.T) {
+	f, err := os.Create("profile.cpu")
+	if err != nil {
+		log.Fatalf("could not create CPU profile: %v\n ", err)
+	}
+	defer f.Close()
+	runtime.SetBlockProfileRate(1)
+	runtime.SetMutexProfileFraction(-1)
+	if err := pprof.StartCPUProfile(f); err != nil {
+		log.Fatalf("could not start CPU profile: %v\n", err)
+	}
+	defer pprof.StopCPUProfile()
+
 	nb := newNBTest()
 	assert.NotNil(t, nb)
 
@@ -695,7 +952,7 @@
 
 	numberOfDeviceTestRuns := 2
 	for i := 1; i <= numberOfDeviceTestRuns; i++ {
-		// 3. Test create device
+		//3. Test create device
 		nb.testCreateDevice(t, nbi)
 
 		// 4. Test Enable a device
@@ -703,6 +960,7 @@
 
 		// 5. Test disable and ReEnable a root device
 		nb.testDisableAndReEnableRootDevice(t, nbi)
+
 		// 6. Test disable and Enable pon port of OLT device
 		nb.testDisableAndEnablePort(t, nbi)
 
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 2ab98a3..303bae3 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,6 +20,7 @@
 	"context"
 	"fmt"
 	"strings"
+	"sync"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
@@ -31,19 +32,25 @@
 )
 
 const (
-	numONUPerOLT = 4
+	numONUPerOLT      = 4
+	startingUNIPortNo = 100
 )
 
 // OLTAdapter represent OLT adapter
 type OLTAdapter struct {
+	flows map[uint64]*voltha.OfpFlowStats
+	lock  sync.Mutex
 	Adapter
 }
 
 // NewOLTAdapter - creates OLT adapter instance
 func NewOLTAdapter(cp adapterif.CoreProxy) *OLTAdapter {
-	a := &OLTAdapter{}
-	a.coreProxy = cp
-	return a
+	return &OLTAdapter{
+		flows: map[uint64]*voltha.OfpFlowStats{},
+		Adapter: Adapter{
+			coreProxy: cp,
+		},
+	}
 }
 
 // Adopt_device creates new handler for added device
@@ -97,7 +104,7 @@
 		}
 
 		// Register Child devices
-		initialUniPortNo := 100
+		initialUniPortNo := startingUNIPortNo
 		for i := 0; i < numONUPerOLT; i++ {
 			go func(seqNo int) {
 				if _, err := oltA.coreProxy.ChildDeviceDetected(
@@ -168,6 +175,11 @@
 	return numONUPerOLT
 }
 
+// Returns the starting UNI port number
+func (oltA *OLTAdapter) GetStartingUNIPortNo() int {
+	return startingUNIPortNo
+}
+
 // Disable_device disables device
 func (oltA *OLTAdapter) Disable_device(device *voltha.Device) error { // nolint
 	go func() {
@@ -261,3 +273,37 @@
 func (oltA *OLTAdapter) Child_device_lost(deviceID string, pPortNo uint32, onuID uint32) error { // nolint
 	return nil
 }
+
+// Update_flows_incrementally mocks the incremental flow update
+func (oltA *OLTAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	oltA.lock.Lock()
+	defer oltA.lock.Unlock()
+
+	if flows.ToAdd != nil {
+		for _, f := range flows.ToAdd.Items {
+			oltA.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		for _, f := range flows.ToRemove.Items {
+			delete(oltA.flows, f.Id)
+		}
+	}
+	return nil
+}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (oltA *OLTAdapter) GetFlowCount() int {
+	oltA.lock.Lock()
+	defer oltA.lock.Unlock()
+
+	return len(oltA.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (oltA *OLTAdapter) ClearFlows() {
+	oltA.lock.Lock()
+	defer oltA.lock.Unlock()
+
+	oltA.flows = map[uint64]*voltha.OfpFlowStats{}
+}
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index ea02210..73ee749 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -20,6 +20,7 @@
 	"context"
 	"fmt"
 	"strings"
+	"sync"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
@@ -32,15 +33,19 @@
 
 // ONUAdapter represent ONU adapter attributes
 type ONUAdapter struct {
-	coreProxy adapterif.CoreProxy
+	flows map[uint64]*voltha.OfpFlowStats
+	lock  sync.Mutex
 	Adapter
 }
 
 // NewONUAdapter creates ONU adapter
 func NewONUAdapter(cp adapterif.CoreProxy) *ONUAdapter {
-	a := &ONUAdapter{}
-	a.coreProxy = cp
-	return a
+	return &ONUAdapter{
+		flows: map[uint64]*voltha.OfpFlowStats{},
+		Adapter: Adapter{
+			coreProxy: cp,
+		},
+	}
 }
 
 // Adopt_device creates new handler for added device
@@ -200,3 +205,37 @@
 	}()
 	return nil
 }
+
+// Update_flows_incrementally mocks the incremental flow update
+func (onuA *ONUAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	onuA.lock.Lock()
+	defer onuA.lock.Unlock()
+
+	if flows.ToAdd != nil {
+		for _, f := range flows.ToAdd.Items {
+			onuA.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		for _, f := range flows.ToRemove.Items {
+			delete(onuA.flows, f.Id)
+		}
+	}
+	return nil
+}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (onuA *ONUAdapter) GetFlowCount() int {
+	onuA.lock.Lock()
+	defer onuA.lock.Unlock()
+
+	return len(onuA.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (onuA *ONUAdapter) ClearFlows() {
+	onuA.lock.Lock()
+	defer onuA.lock.Unlock()
+
+	onuA.flows = map[uint64]*voltha.OfpFlowStats{}
+}