blob: 3ae56144c7666a5a364b20ab32effa098d076fc3 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package model
import (
"context"
"encoding/hex"
"encoding/json"
"reflect"
"sync"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
// Root is used to provide an abstraction to the base root structure
type Root interface {
Node
ExecuteCallbacks(ctx context.Context)
AddCallback(callback CallbackFunction, args ...interface{})
AddNotificationCallback(callback CallbackFunction, args ...interface{})
}
// root points to the top of the data model tree or sub-tree identified by a proxy
type root struct {
*node
Callbacks []CallbackTuple
NotificationCallbacks []CallbackTuple
DirtyNodes map[string][]*node
KvStore *db.Backend
Loading bool
RevisionClass interface{}
mutex sync.RWMutex
}
// NewRoot creates an new instance of a root object
func NewRoot(initialData interface{}, kvStore *db.Backend) Root {
root := &root{}
root.KvStore = kvStore
root.DirtyNodes = make(map[string][]*node)
root.Loading = false
// If there is no storage in place just revert to
// a non persistent mechanism
if kvStore != nil {
root.RevisionClass = reflect.TypeOf(PersistedRevision{})
} else {
root.RevisionClass = reflect.TypeOf(NonPersistedRevision{})
}
root.Callbacks = []CallbackTuple{}
root.NotificationCallbacks = []CallbackTuple{}
root.node = newNode(root, initialData, false, "")
return root
}
// MakeTxBranch creates a new transaction branch
func (r *root) MakeTxBranch() string {
txidBin, _ := uuid.New().MarshalBinary()
txid := hex.EncodeToString(txidBin)[:12]
r.DirtyNodes[txid] = []*node{r.node}
r.node.MakeBranch(txid)
return txid
}
// DeleteTxBranch removes a transaction branch
func (r *root) DeleteTxBranch(txid string) {
for _, dirtyNode := range r.DirtyNodes[txid] {
dirtyNode.DeleteBranch(txid)
}
delete(r.DirtyNodes, txid)
r.node.DeleteBranch(txid)
}
// FoldTxBranch will merge the contents of a transaction branch with the root object
func (r *root) FoldTxBranch(ctx context.Context, txid string) {
// Start by doing a dry run of the merge
// If that fails, it bails out and the branch is deleted
if _, err := r.node.MergeBranch(ctx, txid, true); err != nil {
// Merge operation fails
r.DeleteTxBranch(txid)
} else {
if _, err = r.node.MergeBranch(ctx, txid, false); err != nil {
log.Errorw("Unable to integrate the contents of a transaction branch within the latest branch of a given node", log.Fields{"error": err})
}
r.node.GetRoot().ExecuteCallbacks(ctx)
r.DeleteTxBranch(txid)
}
}
// ExecuteCallbacks will invoke all the callbacks linked to root object
func (r *root) ExecuteCallbacks(ctx context.Context) {
r.mutex.Lock()
defer r.mutex.Unlock()
for len(r.Callbacks) > 0 {
callback := r.Callbacks[0]
r.Callbacks = r.Callbacks[1:]
go callback.Execute(ctx, nil)
}
//for len(r.NotificationCallbacks) > 0 {
// callback := r.NotificationCallbacks[0]
// r.NotificationCallbacks = r.NotificationCallbacks[1:]
// go callback.Execute(nil)
//}
}
// getCallbacks returns the available callbacks
func (r *root) GetCallbacks() []CallbackTuple {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.Callbacks
}
// getCallbacks returns the available notification callbacks
func (r *root) GetNotificationCallbacks() []CallbackTuple {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.NotificationCallbacks
}
// AddCallback inserts a new callback with its arguments
func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
}
// AddNotificationCallback inserts a new notification callback with its arguments
func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
func (r *root) syncParent(ctx context.Context, childRev Revision, txid string) {
data := proto.Clone(r.GetProxy().ParentNode.Latest().GetData().(proto.Message))
for fieldName := range ChildrenFields(data) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
if reflect.TypeOf(childRev.GetData()) == reflect.TypeOf(childDataHolder.Interface()) {
childDataHolder = reflect.ValueOf(childRev.GetData())
reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
}
}
r.GetProxy().ParentNode.Latest().SetConfig(NewDataRevision(r.GetProxy().ParentNode.GetRoot(), data))
r.GetProxy().ParentNode.Latest(txid).Finalize(ctx, false)
}
// Update modifies the content of an object at a given path with the provided data
func (r *root) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
if txid != "" {
trackDirty := func(node *node) *Branch {
r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.MakeBranch(txid)
}
result = r.node.Update(ctx, path, data, strict, txid, trackDirty)
} else {
result = r.node.Update(ctx, path, data, strict, "", nil)
}
if result != nil {
if r.GetProxy().FullPath != r.GetProxy().Path {
r.syncParent(ctx, result, txid)
} else {
result.Finalize(ctx, false)
}
}
r.node.GetRoot().ExecuteCallbacks(ctx)
return result
}
// Add creates a new object at the given path with the provided data
func (r *root) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
if txid != "" {
trackDirty := func(node *node) *Branch {
r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.MakeBranch(txid)
}
result = r.node.Add(ctx, path, data, txid, trackDirty)
} else {
result = r.node.Add(ctx, path, data, "", nil)
}
if result != nil {
result.Finalize(ctx, true)
r.node.GetRoot().ExecuteCallbacks(ctx)
}
return result
}
// Remove discards an object at a given path
func (r *root) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
if txid != "" {
trackDirty := func(node *node) *Branch {
r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.MakeBranch(txid)
}
result = r.node.Remove(ctx, path, txid, trackDirty)
} else {
result = r.node.Remove(ctx, path, "", nil)
}
r.node.GetRoot().ExecuteCallbacks(ctx)
return result
}
// MakeLatest updates a branch with the latest node revision
func (r *root) MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.makeLatest(ctx, branch, revision, changeAnnouncement)
}
func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
return NewPersistedRevision(branch, data, children)
}
return NewNonPersistedRevision(r, branch, data, children)
}
func (r *root) makeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.node.makeLatest(branch, revision, changeAnnouncement)
if r.KvStore != nil && branch.Txid == "" {
tags := make(map[string]string)
for k, v := range r.node.Tags {
tags[k] = v.GetHash()
}
data := &rootData{
Latest: branch.GetLatest().GetHash(),
Tags: tags,
}
if blob, err := json.Marshal(data); err != nil {
// TODO report error
} else {
log.Debugf("Changing root to : %s", string(blob))
if err := r.KvStore.Put(ctx, "root", blob); err != nil {
log.Errorf("failed to properly put value in kvstore - err: %s", err.Error())
}
}
}
}
type rootData struct {
Latest string `json:"latest"`
Tags map[string]string `json:"tags"`
}