VOL-1176: Ported callback mechanism
- Updated some files with the license block
Change-Id: I61db400b3a72c4915f4f3f17cc7a110313c2d25e
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
index c20eac3..6afc63b 100644
--- a/db/model/event_bus.go
+++ b/db/model/event_bus.go
@@ -44,7 +44,12 @@
return bus
}
-func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
+//func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
+func (bus *EventBus) Advertise(args ...interface{}) interface{} {
+ eventType := args[0].(CallbackType)
+ data := args[1]
+ hash := args[2].(string)
+
if _, ok := IGNORED_CALLBACKS[eventType]; ok {
fmt.Printf("ignoring event - type:%s, data:%+v\n", eventType, data)
}
@@ -75,4 +80,6 @@
}
bus.client.Publish(bus.topic, event)
+
+ return nil
}
diff --git a/db/model/merge.go b/db/model/merge.go
index 8ff89c4..97257b4 100644
--- a/db/model/merge.go
+++ b/db/model/merge.go
@@ -87,7 +87,7 @@
func Merge3Way(
forkRev, srcRev, dstRev Revision,
mergeChildFunc func(Revision) Revision,
- dryRun bool) (rev Revision, changes map[CallbackType][]interface{}) {
+ dryRun bool) (rev Revision, changes []ChangeTuple) {
var configChanged bool
@@ -124,9 +124,9 @@
newChildren[fieldName] = append(newChildren[fieldName], mergeChildFunc(rev))
}
if field.IsContainer {
- changes[POST_LISTCHANGE] = append(
- changes[POST_LISTCHANGE],
- NewOperationContext("", nil, fieldName, ""),
+ changes = append(
+ changes, ChangeTuple{POST_LISTCHANGE,
+ NewOperationContext("", nil, fieldName, "")},
)
}
}
@@ -146,18 +146,11 @@
newRev := mergeChildFunc(newList[idx])
newList[idx] = newRev
- changes[POST_ADD] = append(
- changes[POST_ADD],
- newRev.GetData(),
- )
+ changes = append(changes, ChangeTuple{POST_ADD,newRev.GetData()})
}
for key, _ := range src.RemovedKeys {
oldRev := forkList[src.KeyMap1[key]]
-
- changes[POST_REMOVE] = append(
- changes[POST_REMOVE],
- oldRev.GetData(),
- )
+ changes = append(changes, ChangeTuple{POST_REMOVE,oldRev.GetData()})
}
for key, _ := range src.ChangedKeys {
idx := src.KeyMap2[key]
@@ -184,10 +177,7 @@
} else {
newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
newList = append(newList, newRev)
- changes[POST_ADD] = append(
- changes[POST_ADD],
- newRev.GetData(),
- )
+ changes = append(changes, ChangeTuple{POST_ADD,newRev.GetData()})
}
}
for key, _ := range src.ChangedKeys {
@@ -223,10 +213,7 @@
newList[len(newList)-1] = nil
newList = newList[:len(newList)-1]
- changes[POST_REMOVE] = append(
- changes[POST_REMOVE],
- oldRev.GetData(),
- )
+ changes = append(changes, ChangeTuple{POST_REMOVE,oldRev.GetData()})
}
}
@@ -246,10 +233,7 @@
rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
if configChanged {
- changes[POST_UPDATE] = append(
- changes[POST_UPDATE],
- rev.GetData(),
- )
+ changes = append(changes, ChangeTuple{POST_UPDATE,rev.GetData()})
}
return rev, changes
}
diff --git a/db/model/node.go b/db/model/node.go
index 90ab666..77e0cc7 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -37,6 +37,11 @@
AutoPrune bool
}
+type ChangeTuple struct {
+ Type CallbackType
+ Data interface{}
+}
+
func NewNode(root *Root, initialData interface{}, autoPrune bool, txid string) *Node {
n := &Node{}
@@ -69,7 +74,7 @@
return n.root.MakeRevision(branch, data, children)
}
-func (n *Node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement map[CallbackType][]interface{}) {
+func (n *Node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
if _, ok := branch.Revisions[revision.GetHash()]; !ok {
branch.Revisions[revision.GetHash()] = revision
}
@@ -80,15 +85,17 @@
if changeAnnouncement != nil && branch.Txid == "" {
if n.Proxy != nil {
- for changeType, data := range changeAnnouncement {
+ for _, change := range changeAnnouncement {
// TODO: Invoke callback
- fmt.Printf("invoking callback - changeType: %+v, data:%+v\n", changeType, data)
+ fmt.Printf("invoking callback - changeType: %+v, data:%+v\n", change.Type, change.Data)
+ n.root.addCallback(n.Proxy.InvokeCallbacks, change.Type, change.Data, true)
}
}
- for changeType, data := range changeAnnouncement {
+ for _, change := range changeAnnouncement {
// TODO: send notifications
- fmt.Printf("sending notification - changeType: %+v, data:%+v\n", changeType, data)
+ fmt.Printf("sending notification - changeType: %+v, data:%+v\n", change.Type, change.Data)
+ n.root.addNotificationCallback(n.makeEventBus().Advertise, change.Type, change.Data, revision.GetHash())
}
}
}
@@ -266,8 +273,9 @@
msg := rev.Get(depth)
if n.Proxy != nil {
- // TODO: invoke GET callback
- fmt.Println("invoking proxy GET Callbacks")
+ log.Debug("invoking proxy GET Callbacks")
+ msg = n.Proxy.InvokeCallbacks(GET, msg, false)
+
}
return msg
}
@@ -373,8 +381,8 @@
//}
if n.Proxy != nil {
- // TODO: n.proxy.InvokeCallbacks(CallbackType.PRE_UPDATE, data)
- fmt.Println("invoking proxy PRE_UPDATE Callbacks")
+ log.Debug("invoking proxy PRE_UPDATE Callbacks")
+ n.Proxy.InvokeCallbacks(PRE_UPDATE, data, false)
}
if !reflect.DeepEqual(branch.Latest.GetData(), data) {
if strict {
@@ -428,8 +436,8 @@
if path == "" {
if field.Key != "" {
if n.Proxy != nil {
- // TODO -> n.proxy.InvokeCallbacks(PRE_ADD, data)
- fmt.Println("invoking proxy PRE_ADD Callbacks")
+ log.Debug("invoking proxy PRE_ADD Callbacks")
+ n.Proxy.InvokeCallbacks(PRE_ADD, data, false)
}
for _, v := range rev.GetChildren()[name] {
@@ -508,7 +516,7 @@
field := ChildrenFields(n.Type)[name]
var children []Revision
- post_anno := make(map[CallbackType][]interface{})
+ postAnnouncement := []ChangeTuple{}
if field.IsContainer {
if path == "" {
@@ -542,14 +550,14 @@
idx, childRev := n.findRevByKey(children, field.Key, key)
if n.Proxy != nil {
data := childRev.GetData()
- fmt.Println("invoking proxy PRE_REMOVE Callbacks")
- fmt.Printf("setting POST_REMOVE Callbacks : %+v\n", data)
+ n.Proxy.InvokeCallbacks(PRE_REMOVE, data, false)
+ postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, data})
} else {
- fmt.Println("setting POST_REMOVE Callbacks")
+ postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, childRev.GetData()})
}
children = append(children[:idx], children[idx+1:]...)
rev := rev.UpdateChildren(name, children, branch)
- n.root.MakeLatest(branch, rev, post_anno)
+ n.root.MakeLatest(branch, rev, postAnnouncement)
return rev
}
} else {
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 8085eaf..79087a3 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -16,9 +16,13 @@
package model
import (
- "context"
"fmt"
"strings"
+ "reflect"
+ "crypto/md5"
+ "github.com/opencord/voltha-go/common/log"
+ "errors"
+ "runtime"
)
type OperationContext struct {
@@ -48,16 +52,17 @@
Node *Node
Path string
Exclusive bool
- Callbacks []interface{}
+ Callbacks map[CallbackType]map[string]CallbackTuple
}
func NewProxy(root *Root, node *Node, path string, exclusive bool) *Proxy {
+ callbacks := make(map[CallbackType]map[string]CallbackTuple)
p := &Proxy{
Root: root,
Node: node,
Exclusive: exclusive,
Path: path,
- Callbacks: []interface{}{},
+ Callbacks: callbacks,
}
return p
}
@@ -121,11 +126,81 @@
p.Root.DeleteTxBranch(txid)
}
-func (p *Proxy) RegisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+//type CallbackFunction func(context context.Context, args ...interface{})
+type CallbackFunction func(args ...interface{}) interface{}
+type CallbackTuple struct {
+ callback CallbackFunction
+ args []interface{}
}
-func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+func (tuple *CallbackTuple) Execute(context interface{}) interface{} {
+ newArgs := []interface{}{}
+ if context != nil {
+ newArgs = append(newArgs, context)
+ }
+ newArgs = append(newArgs, tuple.args...)
+ return tuple.callback(newArgs...)
}
-func (p *Proxy) InvokeCallback(callbackType CallbackType, context context.Context, proceedOnError bool) {
+func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+ if _, exists := p.Callbacks[callbackType]; !exists {
+ p.Callbacks[callbackType] = make(map[string]CallbackTuple)
+ }
+ funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+ log.Debugf("value of function: %s", funcName)
+ funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+
+ p.Callbacks[callbackType][funcHash] = CallbackTuple{callback, args}
+}
+
+func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+ if _, exists := p.Callbacks[callbackType]; !exists {
+ log.Errorf("no such callback type - %s", callbackType.String())
+ return
+ }
+ // TODO: Not sure if this is the best way to do it.
+ funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+ log.Debugf("value of function: %s", funcName)
+ funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+ if _, exists := p.Callbacks[callbackType][funcHash]; !exists {
+ log.Errorf("function with hash value: '%s' not registered with callback type: '%s'", funcHash, callbackType)
+ return
+ }
+ delete(p.Callbacks[callbackType], funcHash)
+}
+
+func (p *Proxy) invoke(callback CallbackTuple, context interface{}) (result interface{}, err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ errStr := fmt.Sprintf("callback error occurred: %+v", r)
+ err = errors.New(errStr)
+ log.Error(errStr)
+ }
+ }()
+
+ result = callback.Execute(context)
+
+ return result, err
+}
+
+//func (p *Proxy) InvokeCallbacks(callbackType CallbackType, context context.Context, proceedOnError bool) {
+func (p *Proxy) InvokeCallbacks(args ...interface{}) interface{} {
+ callbackType := args[0].(CallbackType)
+ context := args[1]
+ proceedOnError := args[2].(bool)
+
+ var err error
+
+ if _, exists := p.Callbacks[callbackType]; exists {
+ for _, callback := range p.Callbacks[callbackType] {
+ if context, err = p.invoke(callback, context); err != nil {
+ if !proceedOnError {
+ log.Info("An error occurred. Stopping callback invocation")
+ break
+ }
+ log.Info("An error occurred. Invoking next callback")
+ }
+ }
+ }
+ return context
}
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 1824d3f..0984016 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -24,6 +24,7 @@
"github.com/google/uuid"
"encoding/hex"
"encoding/json"
+ "fmt"
)
type proxyTest struct {
@@ -46,7 +47,7 @@
DbPort: 2379,
DbTimeout: 5,
}
- devId string
+ devId string
targetDeviceId string
)
@@ -56,8 +57,6 @@
}
defer log.CleanUp()
-}
-func Test_Proxy_0_GetRootProxy(t *testing.T) {
pt.Backend = NewBackend(pt.DbType, pt.DbHost, pt.DbPort, pt.DbTimeout, pt.DbPrefix)
msgClass := &voltha.Voltha{}
@@ -69,6 +68,18 @@
pt.Proxy = pt.Root.Node.GetProxy("/", false)
}
+//func Test_Proxy_0_GetRootProxy(t *testing.T) {
+// pt.Backend = NewBackend(pt.DbType, pt.DbHost, pt.DbPort, pt.DbTimeout, pt.DbPrefix)
+//
+// msgClass := &voltha.Voltha{}
+// root := NewRoot(msgClass, pt.Backend, nil)
+// pt.Root = root.Load(msgClass)
+//
+// GetProfiling().Report()
+//
+// pt.Proxy = pt.Root.Node.GetProxy("/", false)
+//}
+
func Test_Proxy_1_GetDevices(t *testing.T) {
devices := pt.Proxy.Get("/devices", 1, false, "")
@@ -83,7 +94,7 @@
func Test_Proxy_2_GetDevice(t *testing.T) {
basePath := "/devices/" + targetDeviceId
- device1 := pt.Proxy.Get(basePath + "/ports", 1, false, "")
+ device1 := pt.Proxy.Get(basePath+"/ports", 1, false, "")
t.Logf("retrieved device with ports: %+v", device1)
device2 := pt.Proxy.Get(basePath, 0, false, "")
@@ -157,10 +168,10 @@
devId = "0001" + hex.EncodeToString(devIdBin)[:12]
device := &voltha.Device{
- Id: devId,
- Type: "simulated_olt",
- Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
- AdminState: voltha.AdminState_PREPROVISIONED,
+ Id: devId,
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
}
if added := pt.Proxy.Add("/devices", device, ""); added == nil {
@@ -171,7 +182,7 @@
}
func Test_Proxy_4_CheckAddedDevice(t *testing.T) {
- if d := pt.Proxy.Get("/devices/" + devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added device")
} else {
djson, _ := json.Marshal(d)
@@ -181,7 +192,7 @@
}
func Test_Proxy_5_UpdateDevice(t *testing.T) {
- if retrieved := pt.Proxy.Get("/devices/" + targetDeviceId, 1, false, ""); retrieved == nil {
+ if retrieved := pt.Proxy.Get("/devices/"+targetDeviceId, 1, false, ""); retrieved == nil {
t.Error("Failed to get device")
} else {
var fwVersion int
@@ -196,7 +207,7 @@
cloned.FirmwareVersion = strconv.Itoa(fwVersion)
t.Logf("Before update : %+v", cloned)
- if afterUpdate := pt.Proxy.Update("/devices/" + targetDeviceId, &cloned, false, ""); afterUpdate == nil {
+ if afterUpdate := pt.Proxy.Update("/devices/"+targetDeviceId, &cloned, false, ""); afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
@@ -205,13 +216,13 @@
}
func Test_Proxy_6_CheckUpdatedDevice(t *testing.T) {
- device := pt.Proxy.Get("/devices/" + targetDeviceId, 0, false, "")
+ device := pt.Proxy.Get("/devices/"+targetDeviceId, 0, false, "")
t.Logf("content of updated device: %+v", device)
}
func Test_Proxy_7_RemoveDevice(t *testing.T) {
- if removed := pt.Proxy.Remove("/devices/" + devId, ""); removed == nil {
+ if removed := pt.Proxy.Remove("/devices/"+devId, ""); removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -219,10 +230,58 @@
}
func Test_Proxy_8_CheckRemovedDevice(t *testing.T) {
- if d := pt.Proxy.Get("/devices/" + devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
t.Logf("Device was removed: %s", devId)
}
}
+
+// -----------------------------
+// Callback tests
+// -----------------------------
+
+func firstCallback(args ...interface{}) interface{} {
+ name := args[0]
+ id := args[1]
+ fmt.Printf("Running first callback - name: %s, id: %s\n", name, id)
+ return nil
+}
+func secondCallback(args ...interface{}) interface{} {
+ name := args[0].(map[string]string)
+ id := args[1]
+ fmt.Printf("Running second callback - name: %s, id: %f\n", name["name"], id)
+ panic("Generating a panic in second callback")
+ return nil
+}
+func thirdCallback(args ...interface{}) interface{} {
+ name := args[0]
+ id := args[1].(*voltha.Device)
+ fmt.Printf("Running third callback - name: %+v, id: %s\n", name, id.Id)
+ return nil
+}
+
+func Test_Proxy_Callbacks_1_Register(t *testing.T) {
+ pt.Proxy.RegisterCallback(PRE_ADD, firstCallback, "abcde", "12345")
+
+ m := make(map[string]string)
+ m["name"] = "fghij"
+ pt.Proxy.RegisterCallback(PRE_ADD, secondCallback, m, 1.2345)
+
+ d := &voltha.Device{Id: "12345"}
+ pt.Proxy.RegisterCallback(PRE_ADD, thirdCallback, "klmno", d)
+}
+
+func Test_Proxy_Callbacks_2_Invoke_WithNoInterruption(t *testing.T) {
+ pt.Proxy.InvokeCallbacks(PRE_ADD, nil, true)
+}
+func Test_Proxy_Callbacks_3_Invoke_WithInterruption(t *testing.T) {
+ pt.Proxy.InvokeCallbacks(PRE_ADD, nil, false)
+}
+
+func Test_Proxy_Callbacks_4_Unregister(t *testing.T) {
+ pt.Proxy.UnregisterCallback(PRE_ADD, firstCallback)
+ pt.Proxy.UnregisterCallback(PRE_ADD, secondCallback)
+ pt.Proxy.UnregisterCallback(PRE_ADD, thirdCallback)
+}
diff --git a/db/model/root.go b/db/model/root.go
index 5adb99d..d10e9f1 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -31,8 +31,8 @@
KvStore *Backend
Loading bool
RevisionClass interface{}
- Callbacks []func()
- NotificationCallbacks []func()
+ Callbacks []CallbackTuple
+ NotificationCallbacks []CallbackTuple
}
func NewRoot(initialData interface{}, kvStore *Backend, revisionClass interface{}) *Root {
@@ -44,8 +44,8 @@
revisionClass = reflect.TypeOf(PersistedRevision{})
}
root.RevisionClass = revisionClass
- root.Callbacks = []func(){}
- root.NotificationCallbacks = []func(){}
+ root.Callbacks = []CallbackTuple{}
+ root.NotificationCallbacks = []CallbackTuple{}
root.Node = NewNode(root, initialData, false, "")
@@ -88,12 +88,12 @@
for len(r.Callbacks) > 0 {
callback := r.Callbacks[0]
r.Callbacks = r.Callbacks[1:]
- callback()
+ callback.Execute(nil)
}
for len(r.NotificationCallbacks) > 0 {
callback := r.NotificationCallbacks[0]
r.NotificationCallbacks = r.NotificationCallbacks[1:]
- callback()
+ callback.Execute(nil)
}
}
@@ -101,11 +101,11 @@
return len(r.Callbacks) == 0
}
-func (r *Root) addCallback(callback func()) {
- r.Callbacks = append(r.Callbacks, callback)
+func (r *Root) addCallback(callback CallbackFunction, args ...interface{}) {
+ r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
}
-func (r *Root) addNotificationCallback(callback func()) {
- r.NotificationCallbacks = append(r.NotificationCallbacks, callback)
+func (r *Root) addNotificationCallback(callback CallbackFunction, args ...interface{}) {
+ r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
func (r *Root) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) Revision {
@@ -200,7 +200,7 @@
return r
}
-func (r *Root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement map[CallbackType][]interface{}) {
+func (r *Root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.Node.MakeLatest(branch, revision, changeAnnouncement)
if r.KvStore != nil && branch.Txid == "" {
diff --git a/db/model/utils_test.go b/db/model/utils_test.go
index caf540c..644e6e2 100644
--- a/db/model/utils_test.go
+++ b/db/model/utils_test.go
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package model
import (