XVOL-1689 : ONU stays in DISCOVERED state
VOL-1586 : Possible race condition in openolt python adapter during onu discovery

1) gets Device in response of ChildDeviceDetected.
This avoids race and also removes the need for GetChildDevice.
2)Puts the Device Id into cache to use in future requests,
especially avoid the fail when calling GetChildDevice
in onuIndication because of race.

Change-Id: I60944a6ee0e2ffad80a31ef93f72b55b0b136284
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
index c4b5bf5..8371e09 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
@@ -231,7 +231,7 @@
 }
 
 func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
-	childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
+	childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
 	log.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
 	rpc := "ChildDeviceDetected"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
@@ -278,7 +278,24 @@
 
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
 	log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
-	return unPackResponse(rpc, parentDeviceId, success, result)
+
+	if success {
+		volthaDevice := &voltha.Device{}
+		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+		}
+		return volthaDevice, nil
+	} else {
+		unpackResult := &ic.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		}
+		log.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+		// TODO: Need to get the real error code
+		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	}
 
 }
 
@@ -479,6 +496,6 @@
 		Value: pkt,
 	}
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": deviceId, "success": success})
+	log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
index 3ac5c4f..7ce4414 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
@@ -183,7 +183,42 @@
 }
 
 func (rhp *RequestHandlerProxy) Reboot_device(args []*ic.Argument) (*empty.Empty, error) {
+	if len(args) < 3 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+
+	device := &voltha.Device{}
+	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.FromTopic:
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	//Update the core reference for that device
+	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+	//Invoke the Reboot_device API on the adapter
+	if err := rhp.adapter.Reboot_device(device); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
+
 }
 
 func (rhp *RequestHandlerProxy) Self_test_device(args []*ic.Argument) (*empty.Empty, error) {
@@ -233,6 +268,45 @@
 }
 
 func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
+	log.Debug("Update_flows_bulk")
+	if len(args) < 4 {
+		log.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	device := &voltha.Device{}
+	transactionID := &ic.StrType{}
+	flows := &voltha.Flows{}
+	groups := &voltha.FlowGroups{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case "flows":
+			if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
+				log.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+				return nil, err
+			}
+		case "groups":
+			if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
+				log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+	//Invoke the adopt device on the adapter
+	if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
 }
 
diff --git a/vendor/github.com/opencord/voltha-go/db/model/node.go b/vendor/github.com/opencord/voltha-go/db/model/node.go
index 15ea04e..207df09 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/node.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/node.go
@@ -61,9 +61,10 @@
 }
 
 type node struct {
-	mutex     sync.RWMutex
-	Root      *root
-	Type      interface{}
+	mutex sync.RWMutex
+	Root  *root
+	Type  interface{}
+
 	Branches  map[string]*Branch
 	Tags      map[string]Revision
 	Proxy     *Proxy
@@ -325,7 +326,11 @@
 		if entry, exists := GetRevCache().Cache.Load(path); exists && entry.(Revision) != nil {
 			entryAge := time.Now().Sub(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
 			if entryAge < DATA_REFRESH_PERIOD {
-				log.Debugw("using-cache-entry", log.Fields{"path": path, "hash": hash, "age": entryAge})
+				log.Debugw("using-cache-entry", log.Fields{
+					"path": path,
+					"hash": hash,
+					"age":  entryAge,
+				})
 				return proto.Clone(entry.(Revision).GetData().(proto.Message))
 			} else {
 				log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
@@ -950,6 +955,13 @@
 }
 
 func (n *node) createProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+	log.Debugw("node-create-proxy", log.Fields{
+		"node-type":        reflect.ValueOf(n.Type).Type(),
+		"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+		"path":             path,
+		"fullPath":         fullPath,
+	})
+
 	for strings.HasPrefix(path, "/") {
 		path = path[1:]
 	}
@@ -960,48 +972,130 @@
 	rev := n.GetBranch(NONE).GetLatest()
 	partition := strings.SplitN(path, "/", 2)
 	name := partition[0]
+	var nodeType interface{}
+	// Node type is chosen depending on if we have reached the end of path or not
 	if len(partition) < 2 {
 		path = ""
+		nodeType = n.Type
 	} else {
 		path = partition[1]
+		nodeType = parentNode.Type
 	}
 
-	field := ChildrenFields(n.Type)[name]
-	if field.IsContainer {
-		if path == "" {
-			//log.Error("cannot proxy a container field")
-			newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
-			return newNode.makeProxy(path, fullPath, parentNode, exclusive)
-		} else if field.Key != "" {
-			partition := strings.SplitN(path, "/", 2)
-			key := partition[0]
-			if len(partition) < 2 {
-				path = ""
+	field := ChildrenFields(nodeType)[name]
+
+	if field != nil {
+		if field.IsContainer {
+			log.Debugw("container-field", log.Fields{
+				"node-type":        reflect.ValueOf(n.Type).Type(),
+				"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+				"path":             path,
+				"name":             name,
+			})
+			if path == "" {
+				log.Debugw("folder-proxy", log.Fields{
+					"node-type":        reflect.ValueOf(n.Type).Type(),
+					"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+					"fullPath":         fullPath,
+					"name":             name,
+				})
+				newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
+				return newNode.makeProxy(path, fullPath, parentNode, exclusive)
+			} else if field.Key != "" {
+				log.Debugw("key-proxy", log.Fields{
+					"node-type":        reflect.ValueOf(n.Type).Type(),
+					"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+					"fullPath":         fullPath,
+					"name":             name,
+				})
+				partition := strings.SplitN(path, "/", 2)
+				key := partition[0]
+				if len(partition) < 2 {
+					path = ""
+				} else {
+					path = partition[1]
+				}
+				keyValue := field.KeyFromStr(key)
+				var children []Revision
+				children = make([]Revision, len(rev.GetChildren(name)))
+				copy(children, rev.GetChildren(name))
+
+				// Try to find a matching revision in memory
+				// If not found try the db
+				var childRev Revision
+				if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+					log.Debugw("found-revision-matching-key-in-memory", log.Fields{
+						"node-type":        reflect.ValueOf(n.Type).Type(),
+						"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+						"fullPath":         fullPath,
+						"name":             name,
+					})
+				} else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(fullPath, "", nil); revs != nil && len(revs) > 0 {
+					log.Debugw("found-revision-matching-key-in-db", log.Fields{
+						"node-type":        reflect.ValueOf(n.Type).Type(),
+						"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+						"fullPath":         fullPath,
+						"name":             name,
+					})
+					childRev = revs[0]
+				} else {
+					log.Debugw("no-revision-matching-key", log.Fields{
+						"node-type":        reflect.ValueOf(n.Type).Type(),
+						"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+						"fullPath":         fullPath,
+						"name":             name,
+					})
+				}
+				if childRev != nil {
+					childNode := childRev.GetNode()
+					return childNode.createProxy(path, fullPath, n, exclusive)
+				}
 			} else {
-				path = partition[1]
-			}
-			keyValue := field.KeyFromStr(key)
-			var children []Revision
-			children = make([]Revision, len(rev.GetChildren(name)))
-			copy(children, rev.GetChildren(name))
-			if _, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
-				childNode := childRev.GetNode()
-				return childNode.createProxy(path, fullPath, n, exclusive)
+				log.Errorw("cannot-access-index-of-empty-container", log.Fields{
+					"node-type":        reflect.ValueOf(n.Type).Type(),
+					"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+					"path":             path,
+					"name":             name,
+				})
 			}
 		} else {
-			log.Error("cannot index into container with no keys")
+			log.Debugw("non-container-field", log.Fields{
+				"node-type":        reflect.ValueOf(n.Type).Type(),
+				"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+				"path":             path,
+				"name":             name,
+			})
+			childRev := rev.GetChildren(name)[0]
+			childNode := childRev.GetNode()
+			return childNode.createProxy(path, fullPath, n, exclusive)
 		}
 	} else {
-		childRev := rev.GetChildren(name)[0]
-		childNode := childRev.GetNode()
-		return childNode.createProxy(path, fullPath, n, exclusive)
+		log.Debugw("field-object-is-nil", log.Fields{
+			"node-type":        reflect.ValueOf(n.Type).Type(),
+			"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+			"fullPath":         fullPath,
+			"name":             name,
+		})
 	}
 
-	log.Warnf("Cannot create proxy - latest rev:%s, all revs:%+v", rev.GetHash(), n.GetBranch(NONE).Revisions)
+	log.Warnw("cannot-create-proxy", log.Fields{
+		"node-type":        reflect.ValueOf(n.Type).Type(),
+		"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+		"path":             path,
+		"fullPath":         fullPath,
+		"latest-rev":       rev.GetHash(),
+	})
 	return nil
 }
 
 func (n *node) makeProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+	log.Debugw("node-make-proxy", log.Fields{
+		"node-type":        reflect.ValueOf(n.Type).Type(),
+		"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+		"path":             path,
+		"fullPath":         fullPath,
+	})
+
 	r := &root{
 		node:                  n,
 		Callbacks:             n.Root.GetCallbacks(),
@@ -1013,8 +1107,20 @@
 	}
 
 	if n.Proxy == nil {
+		log.Debugw("constructing-new-proxy", log.Fields{
+			"node-type":        reflect.ValueOf(n.Type).Type(),
+			"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+			"path":             path,
+			"fullPath":         fullPath,
+		})
 		n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
 	} else {
+		log.Debugw("node-has-existing-proxy", log.Fields{
+			"node-type":        reflect.ValueOf(n.Proxy.Node.Type).Type(),
+			"parent-node-type": reflect.ValueOf(n.Proxy.ParentNode.Type).Type(),
+			"path":             n.Proxy.Path,
+			"fullPath":         n.Proxy.FullPath,
+		})
 		if n.Proxy.Exclusive {
 			log.Error("node is already owned exclusively")
 		}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
index 6075b3f..297a740 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
@@ -55,7 +55,6 @@
 	Branch       *Branch
 	WeakRef      string
 	Name         string
-	discarded    bool
 	lastUpdate   time.Time
 }
 
@@ -66,14 +65,9 @@
 	r.Config = NewDataRevision(root, data)
 	r.Children = children
 	r.Hash = r.hashContent()
-	r.discarded = false
 	return r
 }
 
-func (npr *NonPersistedRevision) IsDiscarded() bool {
-	return npr.discarded
-}
-
 func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
 	npr.mutex.Lock()
 	defer npr.mutex.Unlock()
@@ -356,14 +350,33 @@
 			if nameExists {
 				// Check if the data has changed or not
 				if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
+					log.Debugw("replacing-existing-child", log.Fields{
+						"old-hash": existingChildren[nameIndex].GetHash(),
+						"old-data": existingChildren[nameIndex].GetData(),
+						"new-hash": newChild.GetHash(),
+						"new-data": newChild.GetData(),
+					})
+
 					// replace entry
 					newChild.GetNode().Root = existingChildren[nameIndex].GetNode().Root
 					updatedChildren = append(updatedChildren, newChild)
 				} else {
+					log.Debugw("keeping-existing-child", log.Fields{
+						"old-hash": existingChildren[nameIndex].GetHash(),
+						"old-data": existingChildren[nameIndex].GetData(),
+						"new-hash": newChild.GetHash(),
+						"new-data": newChild.GetData(),
+					})
+
 					// keep existing entry
 					updatedChildren = append(updatedChildren, existingChildren[nameIndex])
 				}
 			} else {
+				log.Debugw("adding-unknown-child", log.Fields{
+					"hash": newChild.GetHash(),
+					"data": newChild.GetData(),
+				})
+
 				// new entry ... just add it
 				updatedChildren = append(updatedChildren, newChild)
 			}
@@ -413,7 +426,6 @@
 // Drop is used to indicate when a revision is no longer required
 func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
 	log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "name": npr.GetName()})
-	npr.discarded = true
 }
 
 // ChildDrop will remove a child entry matching the provided parameters from the current revision
@@ -458,6 +470,6 @@
 	// stub ... required by interface
 }
 
-func (pr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
+func (npr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
 	// stub ... required by interface
 }
diff --git a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
index 226fc3c..2ab91b7 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
@@ -127,34 +127,31 @@
 
 StopWatchLoop:
 	for {
-		if pr.IsDiscarded() {
-			break StopWatchLoop
-		}
+		latestRev := pr.GetBranch().GetLatest()
 
 		select {
 		case event, ok := <-pr.events:
 			if !ok {
-				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 				break StopWatchLoop
 			}
-
-			log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
+			log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
 
 			switch event.EventType {
 			case kvstore.DELETE:
-				log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+				log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 				pr.Revision.Drop("", true)
 				break StopWatchLoop
 
 			case kvstore.PUT:
-				log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+				log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 
-				data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
+				data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
 
 				if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
-					log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
+					log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
 				} else {
-					log.Debugw("un-marshaled-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "data": data.Interface()})
+					log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
 
 					var pathLock string
 					var pac *proxyAccessControl
@@ -171,24 +168,26 @@
 						Lease:   0,
 					}
 
-					if pr.GetNode().GetProxy() != nil {
+					if latestRev.GetNode().GetProxy() != nil {
 						//
 						// If a proxy exists for this revision, use it to lock access to the path
 						// and prevent simultaneous updates to the object in memory
 						//
-						pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+						pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
 
 						//If the proxy already has a request in progress, then there is no need to process the watch
-						log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+						log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
 						if PAC().IsReserved(pathLock) {
 							log.Debugw("operation-in-progress", log.Fields{
-								"key":       pr.GetHash(),
-								"path":      pr.GetNode().GetProxy().getFullPath(),
-								"operation": pr.GetNode().GetProxy().Operation.String(),
+								"key":       latestRev.GetHash(),
+								"path":      latestRev.GetNode().GetProxy().getFullPath(),
+								"operation": latestRev.GetNode().GetProxy().Operation.String(),
 							})
 
+							//continue
+
 							// Identify the operation type and determine if the watch event should be applied or not.
-							switch pr.GetNode().GetProxy().Operation {
+							switch latestRev.GetNode().GetProxy().Operation {
 							case PROXY_REMOVE:
 								fallthrough
 
@@ -200,9 +199,9 @@
 								// Therefore, the data of the current event is most likely out-dated
 								// and should be ignored
 								log.Debugw("ignore-watch-event", log.Fields{
-									"key":       pr.GetHash(),
-									"path":      pr.GetNode().GetProxy().getFullPath(),
-									"operation": pr.GetNode().GetProxy().Operation.String(),
+									"key":       latestRev.GetHash(),
+									"path":      latestRev.GetNode().GetProxy().getFullPath(),
+									"operation": latestRev.GetNode().GetProxy().Operation.String(),
 								})
 
 								continue
@@ -216,41 +215,45 @@
 							case PROXY_GET:
 								fallthrough
 
+							case PROXY_WATCH:
+								fallthrough
+
 							default:
 								log.Debugw("process-watch-event", log.Fields{
-									"key":       pr.GetHash(),
-									"path":      pr.GetNode().GetProxy().getFullPath(),
-									"operation": pr.GetNode().GetProxy().Operation.String(),
+									"key":       latestRev.GetHash(),
+									"path":      latestRev.GetNode().GetProxy().getFullPath(),
+									"operation": latestRev.GetNode().GetProxy().Operation.String(),
 								})
 							}
 						}
 
 						// Reserve the path to prevent others to modify while we reload from persistence
-						log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
-						pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+						log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
+						pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
+							latestRev.GetNode().GetProxy(), pathLock)
 						pac.lock()
-						pr.GetNode().GetProxy().Operation = PROXY_UPDATE
-						pac.SetProxy(pr.GetNode().GetProxy())
+						latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
+						pac.SetProxy(latestRev.GetNode().GetProxy())
 
 						// Load changes and apply to memory
-						pr.LoadFromPersistence(pr.GetName(), "", blobs)
+						latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
 
-						log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+						log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
 						pac.getProxy().Operation = PROXY_GET
 						pac.unlock()
 						PAC().ReleasePath(pathLock)
 
 					} else {
 						// This block should be reached only if coming from a non-proxied request
-						log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+						log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 
 						// Load changes and apply to memory
-						pr.LoadFromPersistence(pr.GetName(), "", blobs)
+						latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
 					}
 				}
 
 			default:
-				log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
+				log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
 			}
 		}
 	}
@@ -267,19 +270,18 @@
 	newNPR := pr.Revision.UpdateData(data, branch)
 
 	newPR := &PersistedRevision{
-		Revision: newNPR,
-		Compress: pr.Compress,
-		kvStore:  pr.kvStore,
-		events:   pr.events,
+		Revision:  newNPR,
+		Compress:  pr.Compress,
+		kvStore:   pr.kvStore,
+		events:    pr.events,
+		isWatched: pr.isWatched,
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
-		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
 		pr.Drop(branch.Txid, false)
 	} else {
-		newPR.isWatched = true
 		newPR.isStored = true
 	}
 
@@ -294,18 +296,17 @@
 	newNPR := pr.Revision.UpdateChildren(name, children, branch)
 
 	newPR := &PersistedRevision{
-		Revision: newNPR,
-		Compress: pr.Compress,
-		kvStore:  pr.kvStore,
-		events:   pr.events,
+		Revision:  newNPR,
+		Compress:  pr.Compress,
+		kvStore:   pr.kvStore,
+		events:    pr.events,
+		isWatched: pr.isWatched,
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
-		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
 	} else {
-		newPR.isWatched = true
 		newPR.isStored = true
 	}
 
@@ -319,18 +320,17 @@
 	newNPR := pr.Revision.UpdateAllChildren(children, branch)
 
 	newPR := &PersistedRevision{
-		Revision: newNPR,
-		Compress: pr.Compress,
-		kvStore:  pr.kvStore,
-		events:   pr.events,
+		Revision:  newNPR,
+		Compress:  pr.Compress,
+		kvStore:   pr.kvStore,
+		events:    pr.events,
+		isWatched: pr.isWatched,
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
-		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
 	} else {
-		newPR.isWatched = true
 		newPR.isStored = true
 	}
 
@@ -388,6 +388,7 @@
 	if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
 		// A child matching the provided key exists in memory
 		// Verify if the data differs from what was retrieved from persistence
+		// Also check if we are treating a newer revision of the data or not
 		if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
 			log.Debugw("revision-data-is-different", log.Fields{
 				"key":  childRev.GetHash(),
@@ -407,11 +408,11 @@
 
 			updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
 			updatedChildRev.SetupWatch(updatedChildRev.GetName())
-			childRev.Drop(txid, false)
 			updatedChildRev.SetLastUpdate()
 
 			// Update cache
 			GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+			childRev.Drop(txid, false)
 
 			childRev.GetBranch().LatestLock.Unlock()
 			// END lock child
@@ -459,6 +460,7 @@
 				response = childRev
 			}
 		}
+
 	} else {
 		// There is no available child with that key value.
 		// Create a new child and update the parent revision.
@@ -478,7 +480,6 @@
 		childRev.SetName(typeName + "/" + keyValue)
 		childRev.SetupWatch(childRev.GetName())
 
-		pr.GetBranch().Node.makeLatest(pr.GetBranch(), childRev, nil)
 		pr.GetBranch().LatestLock.Unlock()
 		// END child lock
 
@@ -491,7 +492,6 @@
 		children = append(children, childRev)
 		updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
 		updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
-
 		parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
 		parent.GetBranch(NONE).LatestLock.Unlock()
 		// END parent lock
@@ -512,8 +512,7 @@
 
 // LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
 // by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(
-	path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 
@@ -575,8 +574,7 @@
 						// based on the field's key attribute
 						_, key := GetAttributeValue(data.Interface(), field.Key, 0)
 
-						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
-							txid); entry != nil {
+						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
 							response = append(response, entry)
 						}
 					} else {
@@ -603,8 +601,7 @@
 					}
 					keyValue := field.KeyFromStr(key)
 
-					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
-						txid); entry != nil {
+					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
 						response = append(response, entry)
 					}
 				}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy.go b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
index d4a86f4..182dcdd 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
@@ -100,8 +100,12 @@
 
 // getCallbacks returns the full list of callbacks associated to the proxy
 func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
-	if cb, exists := p.Callbacks[callbackType]; exists {
-		return cb
+	if p != nil {
+		if cb, exists := p.Callbacks[callbackType]; exists {
+			return cb
+		}
+	} else {
+		log.Debugw("proxy-is-nil", log.Fields{"callback-type": callbackType.String()})
 	}
 	return nil
 }
@@ -148,6 +152,7 @@
 	PROXY_UPDATE
 	PROXY_REMOVE
 	PROXY_CREATE
+	PROXY_WATCH
 )
 
 var proxyOperationTypes = []string{
@@ -157,6 +162,7 @@
 	"PROXY_UPDATE",
 	"PROXY_REMOVE",
 	"PROXY_CREATE",
+	"PROXY_WATCH",
 }
 
 func (t ProxyOperation) String() string {
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
index 2a5d034..a1ea6be 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
@@ -255,7 +255,7 @@
 		defer log.Debugw("unlocked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
 	}
 
-	result := pac.getProxy().GetRoot().CreateProxy(path, exclusive)
+	result := pac.getProxy().ParentNode.CreateProxy(path, exclusive)
 
 	if result != nil {
 		return result
diff --git a/vendor/github.com/opencord/voltha-go/db/model/revision.go b/vendor/github.com/opencord/voltha-go/db/model/revision.go
index 4e606f1..cd4c5df 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/revision.go
@@ -22,7 +22,6 @@
 
 type Revision interface {
 	Finalize(bool)
-	IsDiscarded() bool
 	SetConfig(revision *DataRevision)
 	GetConfig() *DataRevision
 	Drop(txid string, includeConfig bool)
diff --git a/vendor/github.com/opencord/voltha-go/db/model/root.go b/vendor/github.com/opencord/voltha-go/db/model/root.go
index 338ef67..5036ce1 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/root.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/root.go
@@ -91,7 +91,7 @@
 		dirtyNode.DeleteBranch(txid)
 	}
 	delete(r.DirtyNodes, txid)
-	delete(r.node.Branches, txid)
+	r.node.DeleteBranch(txid)
 }
 
 // FoldTxBranch will merge the contents of a transaction branch with the root object
@@ -111,9 +111,8 @@
 // ExecuteCallbacks will invoke all the callbacks linked to root object
 func (r *root) ExecuteCallbacks() {
 	r.mutex.Lock()
-	log.Debugf("ExecuteCallbacks has the ROOT lock : %+v", r)
 	defer r.mutex.Unlock()
-	defer log.Debugf("ExecuteCallbacks released the ROOT lock : %+v", r)
+
 	for len(r.Callbacks) > 0 {
 		callback := r.Callbacks[0]
 		r.Callbacks = r.Callbacks[1:]
@@ -133,36 +132,32 @@
 // getCallbacks returns the available callbacks
 func (r *root) GetCallbacks() []CallbackTuple {
 	r.mutex.Lock()
-	log.Debugf("getCallbacks has the ROOT lock : %+v", r)
 	defer r.mutex.Unlock()
-	defer log.Debugf("getCallbacks released the ROOT lock : %+v", r)
+
 	return r.Callbacks
 }
 
 // getCallbacks returns the available notification callbacks
 func (r *root) GetNotificationCallbacks() []CallbackTuple {
 	r.mutex.Lock()
-	log.Debugf("GetNotificationCallbacks has the ROOT lock : %+v", r)
 	defer r.mutex.Unlock()
-	defer log.Debugf("GetNotificationCallbacks released the ROOT lock : %+v", r)
+
 	return r.NotificationCallbacks
 }
 
 // AddCallback inserts a new callback with its arguments
 func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
 	r.mutex.Lock()
-	log.Debugf("AddCallback has the ROOT lock : %+v", r)
 	defer r.mutex.Unlock()
-	defer log.Debugf("AddCallback released the ROOT lock : %+v", r)
+
 	r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
 }
 
 // AddNotificationCallback inserts a new notification callback with its arguments
 func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
 	r.mutex.Lock()
-	log.Debugf("AddNotificationCallback has the ROOT lock : %+v", r)
 	defer r.mutex.Unlock()
-	defer log.Debugf("AddNotificationCallback released the ROOT lock : %+v", r)
+
 	r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
 }
 
diff --git a/vendor/github.com/opencord/voltha-go/kafka/client.go b/vendor/github.com/opencord/voltha-go/kafka/client.go
old mode 100644
new mode 100755
index 3d37f6e..36c1ede
--- a/vendor/github.com/opencord/voltha-go/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/client.go
@@ -16,8 +16,9 @@
 package kafka
 
 import (
-	ca "github.com/opencord/voltha-protos/go/inter_container"
 	"time"
+
+	ca "github.com/opencord/voltha-protos/go/inter_container"
 )
 
 const (
@@ -53,6 +54,7 @@
 	DefaultNumberPartitions         = 3
 	DefaultNumberReplicas           = 1
 	DefaultAutoCreateTopic          = false
+	DefaultMetadataMaxRetry         = 3
 )
 
 // MsgClient represents the set of APIs  a Kafka MsgClient must implement
diff --git a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
old mode 100644
new mode 100755
index e920a83..0576da9
--- a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
@@ -18,15 +18,16 @@
 import (
 	"errors"
 	"fmt"
+	"strings"
+	"sync"
+	"time"
+
 	scc "github.com/bsm/sarama-cluster"
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
 	"gopkg.in/Shopify/sarama.v1"
-	"strings"
-	"sync"
-	"time"
 )
 
 func init() {
@@ -73,6 +74,7 @@
 	lockTopicToConsumerChannelMap sync.RWMutex
 	topicLockMap                  map[string]*sync.RWMutex
 	lockOfTopicLockMap            sync.RWMutex
+	metadataMaxRetry              int
 }
 
 type SaramaClientOption func(*SaramaClient)
@@ -179,6 +181,12 @@
 	}
 }
 
+func MetadatMaxRetries(retry int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.metadataMaxRetry = retry
+	}
+}
+
 func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
 	client := &SaramaClient{
 		KafkaHost: DefaultKafkaHost,
@@ -197,6 +205,7 @@
 	client.numPartitions = DefaultNumberPartitions
 	client.numReplicas = DefaultNumberReplicas
 	client.autoCreateTopic = DefaultAutoCreateTopic
+	client.metadataMaxRetry = DefaultMetadataMaxRetry
 
 	for _, option := range opts {
 		option(client)
@@ -679,6 +688,7 @@
 	config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
 	config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	config.Metadata.Retry.Max = sc.metadataMaxRetry
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
 
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
index 813c978..c9cd56d 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
@@ -18,6 +18,7 @@
 import (
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"os"
 	"reflect"
 	"time"
 )
@@ -30,6 +31,10 @@
 	Id string
 }
 
+func GetHostName() string {
+	return os.Getenv("HOSTNAME")
+}
+
 //WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
 //response. If an error is received from a given channel then the returned error array will contain that error.
 //The error will be at the index corresponding to the order in which the channel appear in the parameter list.
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
index c1ca18d..3828b39 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
@@ -1082,7 +1082,7 @@
 
 	//Check match condition
 	//If the flow_mod match field is empty, that is a special case and indicates the flow entry matches
-	if (mod.Match == nil) || (mod.Match.OxmFields == nil) {
+	if (mod.Match == nil) || (mod.Match.OxmFields == nil) || (len(mod.Match.OxmFields) == 0) {
 		//If we got this far and the match is empty in the flow spec, than the flow matches
 		return true
 	} // TODO : implement the flow match analysis
@@ -1156,3 +1156,11 @@
 	}
 	return len(toKeep) < len(flows), toKeep
 }
+
+func ToOfpOxmField(from []*ofp.OfpOxmOfbField) []*ofp.OfpOxmField {
+	matchFields := make([]*ofp.OfpOxmField, 0)
+	for _, val := range from {
+		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+	}
+	return matchFields
+}