VOL-1387 : Added watch mechanism

- Fixed a few failure cases
- Adjusted a few logs

Change-Id: Ied1ecb3d8996a338eee00e9643685482700e860b
diff --git a/db/model/backend.go b/db/model/backend.go
index 693ec02..dc0e6bd 100644
--- a/db/model/backend.go
+++ b/db/model/backend.go
@@ -55,7 +55,12 @@
 
 	address := host + ":" + strconv.Itoa(port)
 	if b.Client, err = b.newClient(address, timeout); err != nil {
-		log.Errorf("failed to create a new kv Client - %s", err.Error())
+		log.Errorw("failed-to-create-kv-client",
+			log.Fields{
+				"type": storeType, "host": host, "port": port,
+				"timeout": timeout, "prefix": pathPrefix,
+				"error": err.Error(),
+			})
 	}
 
 	return b
@@ -68,7 +73,7 @@
 	case "etcd":
 		return kvstore.NewEtcdClient(address, timeout)
 	}
-	return nil, errors.New("Unsupported KV store")
+	return nil, errors.New("unsupported-kv-store")
 }
 
 func (b *Backend) makePath(key string) string {
@@ -82,7 +87,7 @@
 	defer b.Unlock()
 
 	formattedPath := b.makePath(key)
-	log.Debugf("List key: %s, path: %s", key, formattedPath)
+	log.Debugw("listing-key", log.Fields{"key": key, "path": formattedPath})
 
 	return b.Client.List(formattedPath, b.Timeout)
 }
@@ -93,7 +98,7 @@
 	defer b.Unlock()
 
 	formattedPath := b.makePath(key)
-	log.Debugf("Get key: %s, path: %s", key, formattedPath)
+	log.Debugw("getting-key", log.Fields{"key": key, "path": formattedPath})
 
 	start := time.Now()
 	err, pair := b.Client.Get(formattedPath, b.Timeout)
@@ -110,7 +115,7 @@
 	defer b.Unlock()
 
 	formattedPath := b.makePath(key)
-	log.Debugf("Put key: %s, value: %+v, path: %s", key, string(value.([]byte)), formattedPath)
+	log.Debugw("putting-key", log.Fields{"key": key, "value": string(value.([]byte)), "path": formattedPath})
 
 	return b.Client.Put(formattedPath, value, b.Timeout)
 }
@@ -121,7 +126,29 @@
 	defer b.Unlock()
 
 	formattedPath := b.makePath(key)
-	log.Debugf("Delete key: %s, path: %s", key, formattedPath)
+	log.Debugw("deleting-key", log.Fields{"key": key, "path": formattedPath})
 
 	return b.Client.Delete(formattedPath, b.Timeout)
 }
+
+// CreateWatch starts watching events for the specified key
+func (b *Backend) CreateWatch(key string) chan *kvstore.Event {
+	b.Lock()
+	defer b.Unlock()
+
+	formattedPath := b.makePath(key)
+	log.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
+
+	return b.Client.Watch(formattedPath)
+}
+
+// DeleteWatch stops watching events for the specified key
+func (b *Backend) DeleteWatch(key string, ch chan *kvstore.Event) {
+	b.Lock()
+	defer b.Unlock()
+
+	formattedPath := b.makePath(key)
+	log.Debugw("deleting-key-watch", log.Fields{"key": key, "path": formattedPath})
+
+	b.Client.CloseWatch(formattedPath, ch)
+}
diff --git a/db/model/branch.go b/db/model/branch.go
index d7fa092..2ad40b4 100644
--- a/db/model/branch.go
+++ b/db/model/branch.go
@@ -53,7 +53,6 @@
 
 	if b.Latest != nil {
 		log.Debugf("Switching latest from <%s> to <%s>", b.Latest.GetHash(), latest.GetHash())
-		b.Latest.Drop(b.Txid, false)
 	} else {
 		log.Debugf("Switching latest from <NIL> to <%s>", latest.GetHash())
 	}
diff --git a/db/model/node.go b/db/model/node.go
index 017f121..b8daadd 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -301,7 +301,7 @@
 	names := ChildrenFields(n.Type)
 	field := names[name]
 
-	if field.IsContainer {
+	if field != nil && field.IsContainer {
 		children := make([]Revision, len(rev.GetChildren()[name]))
 		copy(children, rev.GetChildren()[name])
 
@@ -557,6 +557,10 @@
 
 				// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
 				childRev.SetHash(name + "/" + key.String())
+
+				// Create watch for <component>/<key>
+				childRev.SetupWatch(childRev.GetHash())
+				
 				children = append(children, childRev)
 				rev = rev.UpdateChildren(name, children, branch)
 				changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 418a86e..66e6f74 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -233,6 +233,11 @@
 	npr.mutex.Lock()
 	defer npr.mutex.Unlock()
 
+	if npr.Config.Data != nil && npr.Config.Data == data {
+		log.Debugw("stored-data-matches-latest", log.Fields{"stored": npr.Config.Data, "provided": data})
+		return nil
+	}
+
 	newRev := NonPersistedRevision{}
 	newRev.Config = NewDataRevision(npr.Root, data)
 	newRev.Hash = npr.Hash
@@ -249,7 +254,10 @@
 }
 
 func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
-	updatedRev := *npr
+	npr.mutex.Lock()
+	defer npr.mutex.Unlock()
+
+	updatedRev := npr
 
 	// Verify if the map contains already contains an entry matching the name value
 	// If so, we need to retain the contents of that entry and merge them with the provided children revision list
@@ -272,14 +280,12 @@
 		copy(updatedRev.Children[name], children)
 	}
 
-	log.Debugf("Updated Children map entries: %+v", updatedRev.GetChildren())
-
 	updatedRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
 	updatedRev.Hash = npr.Hash
 	updatedRev.Branch = branch
 	updatedRev.Finalize(false)
 
-	return &updatedRev
+	return updatedRev
 }
 
 func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
@@ -304,9 +310,6 @@
 	GetRevCache().Lock()
 	defer GetRevCache().Unlock()
 
-	npr.mutex.Lock()
-	defer npr.mutex.Unlock()
-
 	if includeConfig {
 		delete(GetRevCache().Cache, npr.Config.Hash)
 	}
@@ -314,5 +317,10 @@
 }
 
 func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+	// stub... required by interface
 	return nil
 }
+
+func (npr *NonPersistedRevision) SetupWatch(key string) {
+	// stub ... required by interface
+}
\ No newline at end of file
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 69db753..dd24e7e 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -21,6 +21,7 @@
 	"compress/gzip"
 	"github.com/golang/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
 	"reflect"
 	"runtime/debug"
 	"strings"
@@ -29,10 +30,12 @@
 
 // PersistedRevision holds information of revision meant to be saved in a persistent storage
 type PersistedRevision struct {
-	mutex sync.RWMutex
 	Revision
 	Compress bool
-	kvStore  *Backend
+
+	events  chan *kvstore.Event `json:"-"`
+	kvStore *Backend            `json:"-"`
+	mutex   sync.RWMutex        `json:"-"`
 }
 
 // NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -59,9 +62,8 @@
 	}
 
 	if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
-		log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
+		log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash()})
 		return
-		//}
 	}
 
 	if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
@@ -76,17 +78,69 @@
 		}
 
 		if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
-			log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
-				pr.GetHash(),
-				pr.GetConfig().Data)
+			log.Warnw("problem-storing-revision-config",
+				log.Fields{"error": err, "hash": pr.GetHash(), "data": pr.GetConfig().Data})
 		} else {
-			log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetHash(), pr.GetConfig().Data,
-				string(debug.Stack()))
+			log.Debugw("storing-revision-config",
+				log.Fields{"hash": pr.GetHash(), "data": pr.GetConfig().Data})
 		}
 	}
 }
 
+func (pr *PersistedRevision) SetupWatch(key string) {
+	if pr.events == nil {
+		pr.events = make(chan *kvstore.Event)
+
+		log.Debugw("setting-watch", log.Fields{"key": key})
+
+		pr.events = pr.kvStore.CreateWatch(key)
+
+		// Start watching
+		go pr.startWatching()
+	}
+}
+
+func (pr *PersistedRevision) startWatching() {
+	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
+
+StopWatchLoop:
+	for {
+		select {
+		case event, ok := <-pr.events:
+			if !ok {
+				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
+				break StopWatchLoop
+			}
+
+			log.Debugw("received-event", log.Fields{"type": event.EventType})
+
+			switch event.EventType {
+			case kvstore.DELETE:
+				log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
+				pr.Revision.Drop("", true)
+				break StopWatchLoop
+
+			case kvstore.PUT:
+				log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
+
+				if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
+					log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
+				} else {
+					pr.UpdateData(dataPair.Value, pr.GetBranch())
+				}
+
+			default:
+				log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
+			}
+		}
+	}
+
+	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
+}
+
 func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+	log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
 	var response []Revision
 	var rev Revision
 
@@ -129,6 +183,10 @@
 
 								childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
 								childRev.SetHash(name + "/" + key.String())
+
+								// Create watch for <component>/<key>
+								pr.SetupWatch(childRev.GetHash())
+
 								children = append(children, childRev)
 								rev = rev.UpdateChildren(name, children, rev.GetBranch())
 
@@ -174,9 +232,10 @@
 
 // UpdateData modifies the information in the data model and saves it in the persistent storage
 func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+	log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
+
 	newNPR := pr.Revision.UpdateData(data, branch)
 
-	log.Debugf("Updating data %+v", data)
 	newPR := &PersistedRevision{
 		Revision: newNPR,
 		Compress: pr.Compress,
@@ -188,6 +247,8 @@
 
 // UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
 func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+	log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
+
 	newNPR := pr.Revision.UpdateChildren(name, children, branch)
 
 	newPR := &PersistedRevision{
@@ -201,6 +262,8 @@
 
 // UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
 func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+	log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
+
 	newNPR := pr.Revision.UpdateAllChildren(children, branch)
 
 	newPR := &PersistedRevision{
@@ -215,29 +278,29 @@
 // Drop takes care of eliminating a revision hash that is no longer needed
 // and its associated config when required
 func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
+	log.Debugw("dropping-revision",
+		log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
+
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 	if pr.kvStore != nil && txid == "" {
 		if includeConfig {
-			log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
 			if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
-				log.Errorf(
-					"failed to remove rev config - hash: %s, err: %s",
-					pr.GetConfig().Hash,
-					err.Error(),
-				)
+				log.Errorw("failed-to-remove-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "error": err.Error()})
 			}
 		}
 
-		log.Debugf("removing rev - hash: %s", pr.GetHash())
 		if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
-			log.Errorf("failed to remove rev - hash: %s, err: %s", pr.GetHash(), err.Error())
+			log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
 		}
+
+		pr.kvStore.DeleteWatch(pr.GetConfig().Hash, pr.events)
+
 	} else {
 		if includeConfig {
-			log.Debugf("Attempted to remove revision config:%s linked to transaction:%s", pr.GetConfig().Hash, txid)
+			log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
 		}
-		log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
+		log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
 	}
 
 	pr.Revision.Drop(txid, includeConfig)
diff --git a/db/model/revision.go b/db/model/revision.go
index a848bbf..0a58ccd 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -25,6 +25,7 @@
 	SetHash(hash string)
 	GetHash() string
 	ClearHash()
+	SetupWatch(key string)
 	SetBranch(branch *Branch)
 	GetBranch() *Branch
 	Get(int) interface{}
diff --git a/db/model/root.go b/db/model/root.go
index c97cb8c..730cd39 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -202,10 +202,12 @@
 		result = r.node.Update(path, data, strict, "", nil)
 	}
 
-	if r.Proxy.FullPath != r.Proxy.Path {
-		r.syncParent(result, txid)
-	} else {
-		result.Finalize(false)
+	if result != nil {
+		if r.Proxy.FullPath != r.Proxy.Path {
+			r.syncParent(result, txid)
+		} else {
+			result.Finalize(false)
+		}
 	}
 
 	r.node.GetRoot().ExecuteCallbacks()