VOL-1173 : Removed hash based storage; replaced with per device protobuf
- Ensured proxies issue callbacks instead of forcing with goroutines
- Fixed mutex issue with proxy component
Change-Id: Idabd3257c6d264c0f607ee228e406810304dab43
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 3682694..69db753 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,14 +19,12 @@
import (
"bytes"
"compress/gzip"
- "encoding/json"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
- "io/ioutil"
"reflect"
"runtime/debug"
+ "strings"
"sync"
- "time"
)
// PersistedRevision holds information of revision meant to be saved in a persistent storage
@@ -42,13 +40,12 @@
pr := &PersistedRevision{}
pr.kvStore = branch.Node.GetRoot().KvStore
pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
- pr.Finalize()
return pr
}
// Finalize is responsible of saving the revision in the persistent storage
-func (pr *PersistedRevision) Finalize() {
- pr.store()
+func (pr *PersistedRevision) Finalize(skipOnExist bool) {
+ pr.store(skipOnExist)
}
type revData struct {
@@ -56,102 +53,17 @@
Config string
}
-func (pr *PersistedRevision) store() {
+func (pr *PersistedRevision) store(skipOnExist bool) {
if pr.GetBranch().Txid != "" {
return
}
- if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
- log.Debugf("Entry already exists - hash:%s, stack: %s", pr.Revision.GetHash(), string(debug.Stack()))
- return
- }
- pr.storeConfig()
-
- childrenHashes := make(map[string][]string)
- for fieldName, children := range pr.GetChildren() {
- hashes := []string{}
- for _, rev := range children {
- if rev != nil {
- hashes = append(hashes, rev.GetHash())
- }
- }
- childrenHashes[fieldName] = hashes
- }
- data := &revData{
- Children: childrenHashes,
- Config: pr.GetConfig().Hash,
- }
- if blob, err := json.Marshal(data); err != nil {
- // TODO report error
- } else {
- if pr.Compress {
- var b bytes.Buffer
- w := gzip.NewWriter(&b)
- w.Write(blob)
- w.Close()
- blob = b.Bytes()
- }
- if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
- log.Warnf("Problem storing revision - error: %s, hash: %s, data: %s", err.Error(), pr.GetHash(),
- string(blob))
- } else {
- log.Debugf("Stored entry - hash:%s, blob: %s, stack: %s", pr.Revision.GetHash(), string(blob),
- string(debug.Stack()))
- }
- }
-}
-
-// Load retrieves a revision from th persistent storage
-func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
- blob, _ := kvStore.Get(hash)
-
- start := time.Now()
- output := blob.Value.([]byte)
- var data revData
- if pr.Compress {
- b := bytes.NewBuffer(blob.Value.([]byte))
- if r, err := gzip.NewReader(b); err != nil {
- // TODO : report error
- } else {
- if output, err = ioutil.ReadAll(r); err != nil {
- // TODO report error
- }
- }
- }
- if err := json.Unmarshal(output, &data); err != nil {
- log.Errorf("problem to unmarshal data - %s", err.Error())
- }
-
- stop := time.Now()
- GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
- configHash := data.Config
- configData := pr.loadConfig(kvStore, msgClass, configHash)
-
- assembledChildren := make(map[string][]Revision)
-
- childrenHashes := data.Children
- node := branch.Node
- for fieldName, child := range ChildrenFields(msgClass) {
- var children []Revision
- for _, childHash := range childrenHashes[fieldName] {
- childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
- childNode.LoadLatest(childHash)
- childRev := childNode.Latest()
- children = append(children, childRev)
- }
- assembledChildren[fieldName] = children
- }
-
- rev := NewPersistedRevision(branch, configData, assembledChildren)
- return rev
-}
-
-// storeConfig saves the data associated to a revision in the persistent storage
-func (pr *PersistedRevision) storeConfig() {
- if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
+ if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
return
+ //}
}
+
if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
// TODO report error
} else {
@@ -163,46 +75,101 @@
blob = b.Bytes()
}
- if err := pr.kvStore.Put(pr.GetConfig().Hash, blob); err != nil {
+ if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
- pr.GetConfig().Hash,
+ pr.GetHash(),
pr.GetConfig().Data)
} else {
- log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetConfig().Hash, pr.GetConfig().Data,
+ log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetHash(), pr.GetConfig().Data,
string(debug.Stack()))
}
}
}
-// loadConfig restores the data associated to a revision from the persistent storage
-func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
- blob, _ := kvStore.Get(hash)
- start := time.Now()
- output := blob.Value.([]byte)
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ var response []Revision
+ var rev Revision
- if pr.Compress {
- b := bytes.NewBuffer(blob.Value.([]byte))
- if r, err := gzip.NewReader(b); err != nil {
- // TODO : report error
+ rev = pr
+
+ if pr.kvStore != nil {
+ blobMap, _ := pr.kvStore.List(path)
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
} else {
- if output, err = ioutil.ReadAll(r); err != nil {
- // TODO report error
+ path = partition[1]
+ }
+
+ field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+ if field.IsContainer {
+ for _, blob := range blobMap {
+ output := blob.Value.([]byte)
+
+ data := reflect.New(field.ClassType.Elem())
+
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ // TODO report error
+ } else {
+
+ var children []Revision
+
+ if path == "" {
+ if field.Key != "" {
+ // e.g. /logical_devices/abcde --> path="" name=logical_devices key=abcde
+ if field.Key != "" {
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
+ _, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+ childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
+ childRev.SetHash(name + "/" + key.String())
+ children = append(children, childRev)
+ rev = rev.UpdateChildren(name, children, rev.GetBranch())
+
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+
+ response = append(response, childRev)
+ continue
+ }
+ }
+ } else if field.Key != "" {
+ // e.g. /logical_devices/abcde/flows/vwxyz --> path=abcde/flows/vwxyz
+
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
+ idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue)
+
+ newChildRev := childRev.LoadFromPersistence(path, txid)
+
+ children[idx] = newChildRev[0]
+
+ rev := rev.UpdateChildren(name, rev.GetChildren()[name], rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+
+ response = append(response, newChildRev[0])
+ continue
+ }
+ }
}
}
}
-
- var data reflect.Value
- if msgClass != nil {
- data = reflect.New(reflect.TypeOf(msgClass).Elem())
- if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- // TODO report error
- }
- }
-
- stop := time.Now()
-
- GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
- return data.Interface()
+ return response
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -216,8 +183,6 @@
kvStore: pr.kvStore,
}
- newPR.Finalize()
-
return newPR
}
@@ -231,8 +196,6 @@
kvStore: pr.kvStore,
}
- newPR.Finalize()
-
return newPR
}
@@ -246,8 +209,6 @@
kvStore: pr.kvStore,
}
- newPR.Finalize()
-
return newPR
}