VOL-1173 : Removed hash based storage; replaced with per device protobuf

- Ensured proxies issue callbacks instead of forcing with goroutines
- Fixed mutex issue with proxy component

Change-Id: Idabd3257c6d264c0f607ee228e406810304dab43
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 3682694..69db753 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,14 +19,12 @@
 import (
 	"bytes"
 	"compress/gzip"
-	"encoding/json"
 	"github.com/golang/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
-	"io/ioutil"
 	"reflect"
 	"runtime/debug"
+	"strings"
 	"sync"
-	"time"
 )
 
 // PersistedRevision holds information of revision meant to be saved in a persistent storage
@@ -42,13 +40,12 @@
 	pr := &PersistedRevision{}
 	pr.kvStore = branch.Node.GetRoot().KvStore
 	pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
-	pr.Finalize()
 	return pr
 }
 
 // Finalize is responsible of saving the revision in the persistent storage
-func (pr *PersistedRevision) Finalize() {
-	pr.store()
+func (pr *PersistedRevision) Finalize(skipOnExist bool) {
+	pr.store(skipOnExist)
 }
 
 type revData struct {
@@ -56,102 +53,17 @@
 	Config   string
 }
 
-func (pr *PersistedRevision) store() {
+func (pr *PersistedRevision) store(skipOnExist bool) {
 	if pr.GetBranch().Txid != "" {
 		return
 	}
-	if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
-		log.Debugf("Entry already exists - hash:%s, stack: %s", pr.Revision.GetHash(), string(debug.Stack()))
-		return
-	}
 
-	pr.storeConfig()
-
-	childrenHashes := make(map[string][]string)
-	for fieldName, children := range pr.GetChildren() {
-		hashes := []string{}
-		for _, rev := range children {
-			if rev != nil {
-				hashes = append(hashes, rev.GetHash())
-			}
-		}
-		childrenHashes[fieldName] = hashes
-	}
-	data := &revData{
-		Children: childrenHashes,
-		Config:   pr.GetConfig().Hash,
-	}
-	if blob, err := json.Marshal(data); err != nil {
-		// TODO report error
-	} else {
-		if pr.Compress {
-			var b bytes.Buffer
-			w := gzip.NewWriter(&b)
-			w.Write(blob)
-			w.Close()
-			blob = b.Bytes()
-		}
-		if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
-			log.Warnf("Problem storing revision - error: %s, hash: %s, data: %s", err.Error(), pr.GetHash(),
-				string(blob))
-		} else {
-			log.Debugf("Stored entry - hash:%s, blob: %s, stack: %s", pr.Revision.GetHash(), string(blob),
-				string(debug.Stack()))
-		}
-	}
-}
-
-// Load retrieves a revision from th persistent storage
-func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
-	blob, _ := kvStore.Get(hash)
-
-	start := time.Now()
-	output := blob.Value.([]byte)
-	var data revData
-	if pr.Compress {
-		b := bytes.NewBuffer(blob.Value.([]byte))
-		if r, err := gzip.NewReader(b); err != nil {
-			// TODO : report error
-		} else {
-			if output, err = ioutil.ReadAll(r); err != nil {
-				// TODO report error
-			}
-		}
-	}
-	if err := json.Unmarshal(output, &data); err != nil {
-		log.Errorf("problem to unmarshal data - %s", err.Error())
-	}
-
-	stop := time.Now()
-	GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
-	configHash := data.Config
-	configData := pr.loadConfig(kvStore, msgClass, configHash)
-
-	assembledChildren := make(map[string][]Revision)
-
-	childrenHashes := data.Children
-	node := branch.Node
-	for fieldName, child := range ChildrenFields(msgClass) {
-		var children []Revision
-		for _, childHash := range childrenHashes[fieldName] {
-			childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
-			childNode.LoadLatest(childHash)
-			childRev := childNode.Latest()
-			children = append(children, childRev)
-		}
-		assembledChildren[fieldName] = children
-	}
-
-	rev := NewPersistedRevision(branch, configData, assembledChildren)
-	return rev
-}
-
-// storeConfig saves the data associated to a revision in the persistent storage
-func (pr *PersistedRevision) storeConfig() {
-	if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
+	if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
 		log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
 		return
+		//}
 	}
+
 	if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
 		// TODO report error
 	} else {
@@ -163,46 +75,101 @@
 			blob = b.Bytes()
 		}
 
-		if err := pr.kvStore.Put(pr.GetConfig().Hash, blob); err != nil {
+		if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
 			log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
-				pr.GetConfig().Hash,
+				pr.GetHash(),
 				pr.GetConfig().Data)
 		} else {
-			log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetConfig().Hash, pr.GetConfig().Data,
+			log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetHash(), pr.GetConfig().Data,
 				string(debug.Stack()))
 		}
 	}
 }
 
-// loadConfig restores the data associated to a revision from the persistent storage
-func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
-	blob, _ := kvStore.Get(hash)
-	start := time.Now()
-	output := blob.Value.([]byte)
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+	var response []Revision
+	var rev Revision
 
-	if pr.Compress {
-		b := bytes.NewBuffer(blob.Value.([]byte))
-		if r, err := gzip.NewReader(b); err != nil {
-			// TODO : report error
+	rev = pr
+
+	if pr.kvStore != nil {
+		blobMap, _ := pr.kvStore.List(path)
+
+		partition := strings.SplitN(path, "/", 2)
+		name := partition[0]
+
+		if len(partition) < 2 {
+			path = ""
 		} else {
-			if output, err = ioutil.ReadAll(r); err != nil {
-				// TODO report error
+			path = partition[1]
+		}
+
+		field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+		if field.IsContainer {
+			for _, blob := range blobMap {
+				output := blob.Value.([]byte)
+
+				data := reflect.New(field.ClassType.Elem())
+
+				if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+					// TODO report error
+				} else {
+
+					var children []Revision
+
+					if path == "" {
+						if field.Key != "" {
+							// e.g. /logical_devices/abcde --> path="" name=logical_devices key=abcde
+							if field.Key != "" {
+								children = make([]Revision, len(rev.GetChildren()[name]))
+								copy(children, rev.GetChildren()[name])
+
+								_, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+								childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
+								childRev.SetHash(name + "/" + key.String())
+								children = append(children, childRev)
+								rev = rev.UpdateChildren(name, children, rev.GetBranch())
+
+								rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+
+								response = append(response, childRev)
+								continue
+							}
+						}
+					} else if field.Key != "" {
+						// e.g. /logical_devices/abcde/flows/vwxyz --> path=abcde/flows/vwxyz
+
+						partition := strings.SplitN(path, "/", 2)
+						key := partition[0]
+						if len(partition) < 2 {
+							path = ""
+						} else {
+							path = partition[1]
+						}
+						keyValue := field.KeyFromStr(key)
+
+						children = make([]Revision, len(rev.GetChildren()[name]))
+						copy(children, rev.GetChildren()[name])
+
+						idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue)
+
+						newChildRev := childRev.LoadFromPersistence(path, txid)
+
+						children[idx] = newChildRev[0]
+
+						rev := rev.UpdateChildren(name, rev.GetChildren()[name], rev.GetBranch())
+						rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+
+						response = append(response, newChildRev[0])
+						continue
+					}
+				}
 			}
 		}
 	}
-
-	var data reflect.Value
-	if msgClass != nil {
-		data = reflect.New(reflect.TypeOf(msgClass).Elem())
-		if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
-			// TODO report error
-		}
-	}
-
-	stop := time.Now()
-
-	GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
-	return data.Interface()
+	return response
 }
 
 // UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -216,8 +183,6 @@
 		kvStore:  pr.kvStore,
 	}
 
-	newPR.Finalize()
-
 	return newPR
 }
 
@@ -231,8 +196,6 @@
 		kvStore:  pr.kvStore,
 	}
 
-	newPR.Finalize()
-
 	return newPR
 }
 
@@ -246,8 +209,6 @@
 		kvStore:  pr.kvStore,
 	}
 
-	newPR.Finalize()
-
 	return newPR
 }