VOL-1173 : Removed hash based storage; replaced with per device protobuf
- Ensured proxies issue callbacks instead of forcing with goroutines
- Fixed mutex issue with proxy component
Change-Id: Idabd3257c6d264c0f607ee228e406810304dab43
diff --git a/db/model/base_test.go b/db/model/base_test.go
index 010dff9..26edce6 100644
--- a/db/model/base_test.go
+++ b/db/model/base_test.go
@@ -36,7 +36,7 @@
var (
modelTestConfig = &ModelTestConfig{
- DbPrefix: "service/voltha/data/core/0001",
+ DbPrefix: "service/voltha",
DbType: "etcd",
DbHost: "localhost",
//DbHost: "10.106.153.44",
@@ -110,11 +110,11 @@
root := NewRoot(msgClass, modelTestConfig.Backend)
//root := NewRoot(msgClass, nil)
- if modelTestConfig.Backend != nil {
- modelTestConfig.Root = root.Load(msgClass)
- } else {
- modelTestConfig.Root = root
- }
+ //if modelTestConfig.Backend != nil {
+ //modelTestConfig.Root = root.Load(msgClass)
+ //} else {
+ modelTestConfig.Root = root
+ //}
GetProfiling().Report()
diff --git a/db/model/node.go b/db/model/node.go
index 7ea41ce..bcda91e 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -227,6 +227,7 @@
func (n *node) findRevByKey(revs []Revision, keyName string, value interface{}) (int, Revision) {
n.Lock()
defer n.Unlock()
+
for i, rev := range revs {
dataValue := reflect.ValueOf(rev.GetData())
dataStruct := GetAttributeStructure(rev.GetData(), keyName, 0)
@@ -266,7 +267,20 @@
rev = branch.GetLatest()
}
- return n.getPath(rev, path, depth)
+ var result interface{}
+ if result = n.getPath(rev.GetBranch().GetLatest(), path, depth);
+ reflect.ValueOf(result).IsValid() && reflect.ValueOf(result).IsNil() && n.Root.KvStore != nil {
+ // We got nothing from memory, try to pull it from persistence
+ var prList []interface{}
+ if pr := rev.LoadFromPersistence(path, txid); pr != nil {
+ for _, revEntry := range pr {
+ prList = append(prList, revEntry.GetData())
+ }
+ result = prList
+ }
+ }
+
+ return result
}
// getPath traverses the specified path and retrieves the data associated to it
@@ -357,7 +371,7 @@
var branch *Branch
if txid == "" {
branch = n.GetBranch(NONE)
- } else if branch = n.GetBranch(txid); &branch == nil {
+ } else if branch = n.GetBranch(txid); branch == nil {
branch = makeBranch(n)
}
@@ -420,6 +434,8 @@
log.Errorf("cannot change key field")
}
+ // Prefix the hash value with the data type (e.g. devices, logical_devices, adapters)
+ newChildRev.SetHash(name + "/" + _keyValueType)
children[idx] = newChildRev
updatedRev := rev.UpdateChildren(name, children, branch)
@@ -438,8 +454,10 @@
updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
rev.Drop(txid, false)
n.makeLatest(branch, updatedRev, nil)
+
return newChildRev
}
+
return nil
}
@@ -536,6 +554,9 @@
return exists
}
childRev := n.MakeNode(data, txid).Latest(txid)
+
+ // Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
+ childRev.SetHash(name + "/" + key.String())
children = append(children, childRev)
rev = rev.UpdateChildren(name, children, branch)
changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
@@ -753,15 +774,15 @@
// CreateProxy returns a reference to a sub-tree of the data model
func (n *node) CreateProxy(path string, exclusive bool) *Proxy {
- return n.createProxy(path, path, exclusive)
+ return n.createProxy(path, path, n, exclusive)
}
-func (n *node) createProxy(path string, fullPath string, exclusive bool) *Proxy {
+func (n *node) createProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
for strings.HasPrefix(path, "/") {
path = path[1:]
}
if path == "" {
- return n.makeProxy(path, fullPath, exclusive)
+ return n.makeProxy(path, fullPath, parentNode, exclusive)
}
rev := n.GetBranch(NONE).GetLatest()
@@ -792,20 +813,20 @@
_, childRev := n.findRevByKey(children, field.Key, keyValue)
childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, exclusive)
+ return childNode.createProxy(path, fullPath, n, exclusive)
} else {
log.Error("cannot index into container with no keys")
}
} else {
childRev := rev.GetChildren()[name][0]
childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, exclusive)
+ return childNode.createProxy(path, fullPath, n, exclusive)
}
return nil
}
-func (n *node) makeProxy(path string, fullPath string, exclusive bool) *Proxy {
+func (n *node) makeProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
n.Lock()
defer n.Unlock()
r := &root{
@@ -819,7 +840,7 @@
}
if n.Proxy == nil {
- n.Proxy = NewProxy(r, n, path, fullPath, exclusive)
+ n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
} else {
if n.Proxy.Exclusive {
log.Error("node is already owned exclusively")
@@ -838,17 +859,6 @@
return n.EventBus
}
-// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Persistence Loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-// LoadLatest accesses the persistent storage to construct the data model based on the stored information
-func (n *node) LoadLatest(hash string) {
- branch := NewBranch(n, "", nil, n.AutoPrune)
- pr := &PersistedRevision{}
- rev := pr.Load(branch, n.GetRoot().KvStore, n.Type, hash)
- n.makeLatest(branch, rev, nil)
- n.SetBranch(NONE, branch)
-}
-
func (n *node) SetProxy(proxy *Proxy) {
n.Lock()
defer n.Unlock()
@@ -864,8 +874,10 @@
func (n *node) GetBranch(key string) *Branch {
n.Lock()
defer n.Unlock()
- if branch, exists := n.Branches[key]; exists {
- return branch
+ if n.Branches != nil {
+ if branch, exists := n.Branches[key]; exists {
+ return branch
+ }
}
return nil
}
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 811e35d..e93d0ff 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -15,7 +15,6 @@
*/
package model
-import "C"
import (
"bytes"
"crypto/md5"
@@ -57,7 +56,6 @@
r.Branch = branch
r.Config = NewDataRevision(root, data)
r.Children = children
- r.Finalize()
return r
}
@@ -130,7 +128,7 @@
return npr.Branch.Node
}
-func (npr *NonPersistedRevision) Finalize() {
+func (npr *NonPersistedRevision) Finalize(skipOnExist bool) {
GetRevCache().Lock()
defer GetRevCache().Unlock()
@@ -244,7 +242,7 @@
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
- newRev.Finalize()
+ newRev.Finalize(false)
return &newRev
}
@@ -266,7 +264,7 @@
newRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
newRev.Hash = npr.Hash
newRev.Branch = branch
- newRev.Finalize()
+ newRev.Finalize(false)
return &newRev
}
@@ -284,7 +282,7 @@
newRev.Children[entryName] = make([]Revision, len(childrenEntry))
copy(newRev.Children[entryName], childrenEntry)
}
- newRev.Finalize()
+ newRev.Finalize(false)
return newRev
}
@@ -301,3 +299,7 @@
}
delete(GetRevCache().Cache, npr.Hash)
}
+
+func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ return nil
+}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 3682694..69db753 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,14 +19,12 @@
import (
"bytes"
"compress/gzip"
- "encoding/json"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
- "io/ioutil"
"reflect"
"runtime/debug"
+ "strings"
"sync"
- "time"
)
// PersistedRevision holds information of revision meant to be saved in a persistent storage
@@ -42,13 +40,12 @@
pr := &PersistedRevision{}
pr.kvStore = branch.Node.GetRoot().KvStore
pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
- pr.Finalize()
return pr
}
// Finalize is responsible of saving the revision in the persistent storage
-func (pr *PersistedRevision) Finalize() {
- pr.store()
+func (pr *PersistedRevision) Finalize(skipOnExist bool) {
+ pr.store(skipOnExist)
}
type revData struct {
@@ -56,102 +53,17 @@
Config string
}
-func (pr *PersistedRevision) store() {
+func (pr *PersistedRevision) store(skipOnExist bool) {
if pr.GetBranch().Txid != "" {
return
}
- if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
- log.Debugf("Entry already exists - hash:%s, stack: %s", pr.Revision.GetHash(), string(debug.Stack()))
- return
- }
- pr.storeConfig()
-
- childrenHashes := make(map[string][]string)
- for fieldName, children := range pr.GetChildren() {
- hashes := []string{}
- for _, rev := range children {
- if rev != nil {
- hashes = append(hashes, rev.GetHash())
- }
- }
- childrenHashes[fieldName] = hashes
- }
- data := &revData{
- Children: childrenHashes,
- Config: pr.GetConfig().Hash,
- }
- if blob, err := json.Marshal(data); err != nil {
- // TODO report error
- } else {
- if pr.Compress {
- var b bytes.Buffer
- w := gzip.NewWriter(&b)
- w.Write(blob)
- w.Close()
- blob = b.Bytes()
- }
- if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
- log.Warnf("Problem storing revision - error: %s, hash: %s, data: %s", err.Error(), pr.GetHash(),
- string(blob))
- } else {
- log.Debugf("Stored entry - hash:%s, blob: %s, stack: %s", pr.Revision.GetHash(), string(blob),
- string(debug.Stack()))
- }
- }
-}
-
-// Load retrieves a revision from th persistent storage
-func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
- blob, _ := kvStore.Get(hash)
-
- start := time.Now()
- output := blob.Value.([]byte)
- var data revData
- if pr.Compress {
- b := bytes.NewBuffer(blob.Value.([]byte))
- if r, err := gzip.NewReader(b); err != nil {
- // TODO : report error
- } else {
- if output, err = ioutil.ReadAll(r); err != nil {
- // TODO report error
- }
- }
- }
- if err := json.Unmarshal(output, &data); err != nil {
- log.Errorf("problem to unmarshal data - %s", err.Error())
- }
-
- stop := time.Now()
- GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
- configHash := data.Config
- configData := pr.loadConfig(kvStore, msgClass, configHash)
-
- assembledChildren := make(map[string][]Revision)
-
- childrenHashes := data.Children
- node := branch.Node
- for fieldName, child := range ChildrenFields(msgClass) {
- var children []Revision
- for _, childHash := range childrenHashes[fieldName] {
- childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
- childNode.LoadLatest(childHash)
- childRev := childNode.Latest()
- children = append(children, childRev)
- }
- assembledChildren[fieldName] = children
- }
-
- rev := NewPersistedRevision(branch, configData, assembledChildren)
- return rev
-}
-
-// storeConfig saves the data associated to a revision in the persistent storage
-func (pr *PersistedRevision) storeConfig() {
- if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
+ if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
return
+ //}
}
+
if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
// TODO report error
} else {
@@ -163,46 +75,101 @@
blob = b.Bytes()
}
- if err := pr.kvStore.Put(pr.GetConfig().Hash, blob); err != nil {
+ if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
- pr.GetConfig().Hash,
+ pr.GetHash(),
pr.GetConfig().Data)
} else {
- log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetConfig().Hash, pr.GetConfig().Data,
+ log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetHash(), pr.GetConfig().Data,
string(debug.Stack()))
}
}
}
-// loadConfig restores the data associated to a revision from the persistent storage
-func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
- blob, _ := kvStore.Get(hash)
- start := time.Now()
- output := blob.Value.([]byte)
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ var response []Revision
+ var rev Revision
- if pr.Compress {
- b := bytes.NewBuffer(blob.Value.([]byte))
- if r, err := gzip.NewReader(b); err != nil {
- // TODO : report error
+ rev = pr
+
+ if pr.kvStore != nil {
+ blobMap, _ := pr.kvStore.List(path)
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
} else {
- if output, err = ioutil.ReadAll(r); err != nil {
- // TODO report error
+ path = partition[1]
+ }
+
+ field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+ if field.IsContainer {
+ for _, blob := range blobMap {
+ output := blob.Value.([]byte)
+
+ data := reflect.New(field.ClassType.Elem())
+
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ // TODO report error
+ } else {
+
+ var children []Revision
+
+ if path == "" {
+ if field.Key != "" {
+ // e.g. /logical_devices/abcde --> path="" name=logical_devices key=abcde
+ if field.Key != "" {
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
+ _, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+ childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
+ childRev.SetHash(name + "/" + key.String())
+ children = append(children, childRev)
+ rev = rev.UpdateChildren(name, children, rev.GetBranch())
+
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+
+ response = append(response, childRev)
+ continue
+ }
+ }
+ } else if field.Key != "" {
+ // e.g. /logical_devices/abcde/flows/vwxyz --> path=abcde/flows/vwxyz
+
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
+ idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue)
+
+ newChildRev := childRev.LoadFromPersistence(path, txid)
+
+ children[idx] = newChildRev[0]
+
+ rev := rev.UpdateChildren(name, rev.GetChildren()[name], rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+
+ response = append(response, newChildRev[0])
+ continue
+ }
+ }
}
}
}
-
- var data reflect.Value
- if msgClass != nil {
- data = reflect.New(reflect.TypeOf(msgClass).Elem())
- if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- // TODO report error
- }
- }
-
- stop := time.Now()
-
- GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
- return data.Interface()
+ return response
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -216,8 +183,6 @@
kvStore: pr.kvStore,
}
- newPR.Finalize()
-
return newPR
}
@@ -231,8 +196,6 @@
kvStore: pr.kvStore,
}
- newPR.Finalize()
-
return newPR
}
@@ -246,8 +209,6 @@
kvStore: pr.kvStore,
}
- newPR.Finalize()
-
return newPR
}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 65da561..402731a 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -55,56 +55,50 @@
// Proxy holds the information for a specific location with the data model
type Proxy struct {
sync.RWMutex
- Root *root
- Node *node
- Path string
- FullPath string
- Exclusive bool
- Callbacks map[CallbackType]map[string]*CallbackTuple
+ Root *root
+ Node *node
+ ParentNode *node
+ Path string
+ FullPath string
+ Exclusive bool
+ Callbacks map[CallbackType]map[string]*CallbackTuple
}
// NewProxy instantiates a new proxy to a specific location
-func NewProxy(root *root, node *node, path string, fullPath string, exclusive bool) *Proxy {
+func NewProxy(root *root, node *node, parentNode *node, path string, fullPath string, exclusive bool) *Proxy {
callbacks := make(map[CallbackType]map[string]*CallbackTuple)
if fullPath == "/" {
fullPath = ""
}
p := &Proxy{
- Root: root,
- Node: node,
- Exclusive: exclusive,
- Path: path,
- FullPath: fullPath,
- Callbacks: callbacks,
+ Root: root,
+ Node: node,
+ ParentNode: parentNode,
+ Exclusive: exclusive,
+ Path: path,
+ FullPath: fullPath,
+ Callbacks: callbacks,
}
return p
}
// GetRoot returns the root attribute of the proxy
func (p *Proxy) GetRoot() *root {
- p.Lock()
- defer p.Unlock()
return p.Root
}
// getPath returns the path attribute of the proxy
func (p *Proxy) getPath() string {
- p.Lock()
- defer p.Unlock()
return p.Path
}
// getFullPath returns the full path attribute of the proxy
func (p *Proxy) getFullPath() string {
- p.Lock()
- defer p.Unlock()
return p.FullPath
}
// getCallbacks returns the full list of callbacks associated to the proxy
func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
- p.Lock()
- defer p.Unlock()
if cb, exists := p.Callbacks[callbackType]; exists {
return cb
}
@@ -113,8 +107,6 @@
// getCallback returns a specific callback matching the type and function hash
func (p *Proxy) getCallback(callbackType CallbackType, funcHash string) *CallbackTuple {
- p.Lock()
- defer p.Unlock()
if tuple, exists := p.Callbacks[callbackType][funcHash]; exists {
return tuple
}
@@ -146,7 +138,10 @@
// for locations that need to be access controlled.
func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
// TODO: Add other path prefixes that may need control
- if strings.HasPrefix(path, "/devices") || strings.HasPrefix(path, "/logical_devices"){
+ if strings.HasPrefix(path, "/devices") ||
+ strings.HasPrefix(path, "/logical_devices") ||
+ strings.HasPrefix(path, "/adapters") {
+
split := strings.SplitN(path, "/", -1)
switch len(split) {
case 2:
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index e9a4ffa..f6169b6 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -167,7 +167,7 @@
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
// The data traversal through reflection currently corrupts the content
- return pac.getProxy().GetRoot().Get(path, "", 0, deep, txid)
+ return pac.getProxy().GetRoot().Get(path, "", depth, deep, txid)
}
// Update changes the content of the data model at the specified location with the provided data
@@ -177,7 +177,12 @@
defer pac.unlock()
log.Debugf("controlling update, stack = %s", string(debug.Stack()))
}
- return pac.getProxy().GetRoot().Update(path, data, strict, txid, nil).GetData()
+ result := pac.getProxy().GetRoot().Update(path, data, strict, txid, nil)
+
+ if result != nil {
+ return result.GetData()
+ }
+ return nil
}
// Add creates a new data model entry at the specified location with the provided data
@@ -187,8 +192,12 @@
defer pac.unlock()
log.Debugf("controlling add, stack = %s", string(debug.Stack()))
}
- return pac.getProxy().GetRoot().Add(path, data, txid, nil).GetData()
+ result := pac.getProxy().GetRoot().Add(path, data, txid, nil)
+ if result != nil {
+ return result.GetData()
+ }
+ return nil
}
// Remove discards information linked to the data model path
diff --git a/db/model/revision.go b/db/model/revision.go
index 3912f67..a848bbf 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -16,7 +16,7 @@
package model
type Revision interface {
- Finalize()
+ Finalize(bool)
SetConfig(revision *DataRevision)
GetConfig() *DataRevision
Drop(txid string, includeConfig bool)
@@ -30,6 +30,7 @@
Get(int) interface{}
GetData() interface{}
GetNode() *node
+ LoadFromPersistence(path string, txid string) []Revision
UpdateData(data interface{}, branch *Branch) Revision
UpdateChildren(name string, children []Revision, branch *Branch) Revision
UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision
diff --git a/db/model/root.go b/db/model/root.go
index c4339a4..c97cb8c 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -19,17 +19,16 @@
import (
"encoding/hex"
"encoding/json"
+ "github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"reflect"
"sync"
- "time"
)
// Root is used to provide an abstraction to the base root structure
type Root interface {
Node
- Load(rootClass interface{}) *root
ExecuteCallbacks()
AddCallback(callback CallbackFunction, args ...interface{})
@@ -165,6 +164,22 @@
r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
+func (r *root) syncParent(childRev Revision, txid string) {
+ data := proto.Clone(r.Proxy.ParentNode.Latest().GetData().(proto.Message))
+
+ for fieldName, _ := range ChildrenFields(data) {
+ childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
+ if reflect.TypeOf(childRev.GetData()) == reflect.TypeOf(childDataHolder.Interface()) {
+ childDataHolder = reflect.ValueOf(childRev.GetData())
+ reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
+ }
+ }
+
+ r.Proxy.ParentNode.Latest().SetConfig(NewDataRevision(r.Proxy.ParentNode.Root, data))
+ r.Proxy.ParentNode.Latest(txid).Finalize(false)
+}
+
+
// Update modifies the content of an object at a given path with the provided data
func (r *root) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
@@ -187,6 +202,12 @@
result = r.node.Update(path, data, strict, "", nil)
}
+ if r.Proxy.FullPath != r.Proxy.Path {
+ r.syncParent(result, txid)
+ } else {
+ result.Finalize(false)
+ }
+
r.node.GetRoot().ExecuteCallbacks()
return result
@@ -214,8 +235,10 @@
result = r.node.Add(path, data, "", nil)
}
- r.node.GetRoot().ExecuteCallbacks()
-
+ if result != nil {
+ result.Finalize(true)
+ r.node.GetRoot().ExecuteCallbacks()
+ }
return result
}
@@ -246,15 +269,6 @@
return result
}
-// Load retrieves data from a persistent storage
-func (r *root) Load(rootClass interface{}) *root {
- //fakeKvStore := &Backend{}
- //root := NewRoot(rootClass, nil)
- //root.KvStore = r.KvStore
- r.loadFromPersistence(rootClass)
- return r
-}
-
// MakeLatest updates a branch with the latest node revision
func (r *root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.makeLatest(branch, revision, changeAnnouncement)
@@ -294,25 +308,4 @@
type rootData struct {
Latest string `json:latest`
Tags map[string]string `json:tags`
-}
-
-func (r *root) loadFromPersistence(rootClass interface{}) {
- var data rootData
-
- r.Loading = true
- blob, _ := r.KvStore.Get("root")
-
- start := time.Now()
- if err := json.Unmarshal(blob.Value.([]byte), &data); err != nil {
- log.Errorf("problem to unmarshal blob - error:%s\n", err.Error())
- }
- stop := time.Now()
- GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
- for tag, hash := range data.Tags {
- r.node.LoadLatest(hash)
- r.node.Tags[tag] = r.node.Latest()
- }
-
- r.node.LoadLatest(data.Latest)
- r.Loading = false
-}
+}
\ No newline at end of file
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 784b506..9e8710b 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -51,7 +51,9 @@
var agent DeviceAgent
agent.adapterProxy = ap
cloned := (proto.Clone(device)).(*voltha.Device)
- cloned.Id = CreateDeviceId()
+ if cloned.Id == "" {
+ cloned.Id = CreateDeviceId()
+ }
cloned.AdminState = voltha.AdminState_PREPROVISIONED
cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
cloned.Flows = &ofp.Flows{Items: nil}
@@ -180,15 +182,13 @@
} else {
oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
+
// store the changed data
- storedData.Flows.Items = flows
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+ afterUpdate := agent.flowProxy.Update("/", &ofp.Flows{Items: flows}, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
- // For now, force the callback to occur
- go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
return nil
}
}
@@ -196,21 +196,16 @@
func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- var oldData *voltha.FlowGroups
log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
- if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+ if _, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
- oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
// store the changed data
- storedData.FlowGroups.Items = groups
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+ afterUpdate := agent.groupProxy.Update("/", &ofp.FlowGroups{Items: groups}, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
- // For now, force the callback to occur
- go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
return nil
}
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 45584a1..682de48 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -18,7 +18,6 @@
import (
"context"
"errors"
- "github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
@@ -184,20 +183,37 @@
return device.Root, nil
}
+// GetDevice retrieves the latest device information from the data model
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
- for _, agent := range dMgr.deviceAgents {
- if device, err := agent.getDevice(); err == nil {
- cloned := proto.Clone(device).(*voltha.Device)
- result.Items = append(result.Items, cloned)
+ if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+ for _, device := range devices.([]interface{}) {
+ if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
+ agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ }
+ result.Items = append(result.Items, device.(*voltha.Device))
}
}
return result, nil
}
+//func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
+// log.Debug("ListDevices")
+// result := &voltha.Devices{}
+// dMgr.lockDeviceAgentsMap.Lock()
+// defer dMgr.lockDeviceAgentsMap.Unlock()
+// for _, agent := range dMgr.deviceAgents {
+// if device, err := agent.getDevice(); err == nil {
+// //cloned := proto.Clone(device).(*voltha.Device)
+// result.Items = append(result.Items, device)
+// }
+// }
+// return result, nil
+//}
+
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 8a69967..ea94788 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,12 +50,13 @@
flowDecomposer *fd.FlowDecomposer
}
-func newLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
+func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
+ deviceMgr *DeviceManager,
cdProxy *model.Proxy) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceId = id
- agent.rootDeviceId = device.Id
+ agent.rootDeviceId = deviceId
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
@@ -148,8 +149,7 @@
defer agent.lockLogicalDevice.Unlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
- return cloned, nil
+ return lDevice, nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
@@ -162,7 +162,7 @@
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
lPorts := make([]*voltha.LogicalPort, 0)
for _, port := range lDevice.Ports {
- lPorts = append(lPorts, proto.Clone(port).(*voltha.LogicalPort))
+ lPorts = append(lPorts, port)
}
return &voltha.LogicalPorts{Items: lPorts}, nil
}
@@ -195,31 +195,19 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- cloned := proto.Clone(flows).(*ofp.Flows)
- afterUpdate := agent.flowProxy.Update("/", cloned, false, "")
+ afterUpdate := agent.flowProxy.Update("/", flows, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
}
- // TODO: Remove this code when the model update is fixed
- ld, _ := agent.getLogicalDeviceWithoutLock()
- clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
- clonedDevice.Flows = proto.Clone(flows).(*ofp.Flows)
- agent.updateLogicalDeviceWithoutLock(clonedDevice)
return nil
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- cloned := proto.Clone(flowGroups).(*ofp.FlowGroups)
- afterUpdate := agent.groupProxy.Update("/", cloned, false, "")
+ afterUpdate := agent.groupProxy.Update("/", flowGroups, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
}
- // TODO: Remove this code when the model update is fixed
- ld, _ := agent.getLogicalDeviceWithoutLock()
- clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
- clonedDevice.FlowGroups = proto.Clone(flowGroups).(*ofp.FlowGroups)
- agent.updateLogicalDeviceWithoutLock(clonedDevice)
return nil
}
@@ -229,8 +217,7 @@
log.Debug("getLogicalDeviceWithoutLock")
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
- return cloned, nil
+ return lDevice, nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
@@ -260,7 +247,6 @@
return status.Error(codes.NotFound, agent.logicalDeviceId)
} else {
log.Infow("!!!!!!!!!!!ADDING-UNI", log.Fields{"deviceId": childDevice.Id})
- cloned := proto.Clone(ldevice).(*voltha.LogicalDevice)
portCap.Port.RootPort = false
//TODO: For now use the channel id assigned by the OLT as logical port number
lPortNo := childDevice.ProxyAddress.ChannelId
@@ -269,17 +255,15 @@
portCap.Port.OfpPort.Name = portCap.Port.Id
portCap.Port.DeviceId = childDevice.Id
portCap.Port.DevicePortNo = uniPort
- lp := proto.Clone(portCap.Port).(*voltha.LogicalPort)
- lp.DeviceId = childDevice.Id
- cloned.Ports = append(cloned.Ports, lp)
- return agent.updateLogicalDeviceWithoutLock(cloned)
+ portCap.Port.DeviceId = childDevice.Id
+ ldevice.Ports = append(ldevice.Ports, portCap.Port)
+ return agent.updateLogicalDeviceWithoutLock(ldevice)
}
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
- cloned := proto.Clone(logicalDevice).(*voltha.LogicalDevice)
- afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
}
@@ -397,8 +381,6 @@
return err
}
}
- //// For now, force the callback to occur
- //go agent.flowTableUpdated(oldData, lDevice.Flows)
return nil
}
@@ -952,45 +934,36 @@
func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
- // Run this callback in it's own go routine since callbacks are not invoked in their own
- // go routine
- go func(args ...interface{}) interface{} {
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
+ var previousData *ofp.Flows
+ var latestData *ofp.Flows
- var previousData *ofp.Flows
- var latestData *ofp.Flows
+ var ok bool
+ if previousData, ok = args[0].(*ofp.Flows); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ }
+ if latestData, ok = args[1].(*ofp.Flows); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ }
- var ok bool
- if previousData, ok = args[0].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- // Ensure the device graph has been setup
- agent.setupDeviceGraph()
-
- var groups *ofp.FlowGroups
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- groups = lDevice.FlowGroups
- log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- for deviceId, value := range deviceRules.GetRules() {
- agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
- agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
- }
-
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debug("flow-update-not-required")
return nil
- }(args...)
+ }
+
+ // Ensure the device graph has been setup
+ agent.setupDeviceGraph()
+
+ var groups *ofp.FlowGroups
+ lDevice, _ := agent.getLogicalDeviceWithoutLock()
+ groups = lDevice.FlowGroups
+ log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ for deviceId, value := range deviceRules.GetRules() {
+ agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+ agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+ }
return nil
}
@@ -998,43 +971,35 @@
func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
- // Run this callback in it's own go routine since callbacks are not invoked in their own
- // go routine
- go func(args ...interface{}) interface{} {
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
+ var previousData *ofp.FlowGroups
+ var latestData *ofp.FlowGroups
- var previousData *ofp.FlowGroups
- var latestData *ofp.FlowGroups
+ var ok bool
+ if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ }
+ if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ }
- var ok bool
- if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- // Ensure the device graph has been setup
- agent.setupDeviceGraph()
-
- var flows *ofp.Flows
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- flows = lDevice.Flows
- log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- for deviceId, value := range deviceRules.GetRules() {
- agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
- agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
- }
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debug("flow-update-not-required")
return nil
- }(args...)
+ }
+
+ // Ensure the device graph has been setup
+ agent.setupDeviceGraph()
+
+ var flows *ofp.Flows
+ lDevice, _ := agent.getLogicalDeviceWithoutLock()
+ flows = lDevice.Flows
+ log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ for deviceId, value := range deviceRules.GetRules() {
+ agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+ agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+ }
return nil
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 64743cc..5f9cea2 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -106,18 +106,40 @@
}
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
- log.Debug("listLogicalDevices")
+ log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- for _, agent := range ldMgr.logicalDeviceAgents {
- if lDevice, err := agent.GetLogicalDevice(); err == nil {
- result.Items = append(result.Items, lDevice)
+ if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+ for _, logicalDevice := range logicalDevices.([]interface{}) {
+ if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
+ agent = newLogicalDeviceAgent(
+ logicalDevice.(*voltha.LogicalDevice).Id,
+ logicalDevice.(*voltha.LogicalDevice).RootDeviceId,
+ ldMgr,
+ ldMgr.deviceMgr,
+ ldMgr.clusterDataProxy,
+ )
+ ldMgr.addLogicalDeviceAgentToMap(agent)
+ go agent.start(nil)
+ }
+ result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
}
}
return result, nil
}
+//func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
+// log.Debug("listLogicalDevices")
+// result := &voltha.LogicalDevices{}
+// ldMgr.lockLogicalDeviceAgentsMap.Lock()
+// defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+// for _, agent := range ldMgr.logicalDeviceAgents {
+// if lDevice, err := agent.GetLogicalDevice(); err == nil {
+// result.Items = append(result.Items, lDevice)
+// }
+// }
+// return result, nil
+//}
+
func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
@@ -137,7 +159,7 @@
}
log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := newLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.start(ctx)