VOL-1176: Ported callback mechanism
- Updated some files with the license block
Change-Id: I61db400b3a72c4915f4f3f17cc7a110313c2d25e
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 8085eaf..79087a3 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -16,9 +16,13 @@
package model
import (
- "context"
"fmt"
"strings"
+ "reflect"
+ "crypto/md5"
+ "github.com/opencord/voltha-go/common/log"
+ "errors"
+ "runtime"
)
type OperationContext struct {
@@ -48,16 +52,17 @@
Node *Node
Path string
Exclusive bool
- Callbacks []interface{}
+ Callbacks map[CallbackType]map[string]CallbackTuple
}
func NewProxy(root *Root, node *Node, path string, exclusive bool) *Proxy {
+ callbacks := make(map[CallbackType]map[string]CallbackTuple)
p := &Proxy{
Root: root,
Node: node,
Exclusive: exclusive,
Path: path,
- Callbacks: []interface{}{},
+ Callbacks: callbacks,
}
return p
}
@@ -121,11 +126,81 @@
p.Root.DeleteTxBranch(txid)
}
-func (p *Proxy) RegisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+//type CallbackFunction func(context context.Context, args ...interface{})
+type CallbackFunction func(args ...interface{}) interface{}
+type CallbackTuple struct {
+ callback CallbackFunction
+ args []interface{}
}
-func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+func (tuple *CallbackTuple) Execute(context interface{}) interface{} {
+ newArgs := []interface{}{}
+ if context != nil {
+ newArgs = append(newArgs, context)
+ }
+ newArgs = append(newArgs, tuple.args...)
+ return tuple.callback(newArgs...)
}
-func (p *Proxy) InvokeCallback(callbackType CallbackType, context context.Context, proceedOnError bool) {
+func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+ if _, exists := p.Callbacks[callbackType]; !exists {
+ p.Callbacks[callbackType] = make(map[string]CallbackTuple)
+ }
+ funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+ log.Debugf("value of function: %s", funcName)
+ funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+
+ p.Callbacks[callbackType][funcHash] = CallbackTuple{callback, args}
+}
+
+func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+ if _, exists := p.Callbacks[callbackType]; !exists {
+ log.Errorf("no such callback type - %s", callbackType.String())
+ return
+ }
+ // TODO: Not sure if this is the best way to do it.
+ funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+ log.Debugf("value of function: %s", funcName)
+ funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+ if _, exists := p.Callbacks[callbackType][funcHash]; !exists {
+ log.Errorf("function with hash value: '%s' not registered with callback type: '%s'", funcHash, callbackType)
+ return
+ }
+ delete(p.Callbacks[callbackType], funcHash)
+}
+
+func (p *Proxy) invoke(callback CallbackTuple, context interface{}) (result interface{}, err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ errStr := fmt.Sprintf("callback error occurred: %+v", r)
+ err = errors.New(errStr)
+ log.Error(errStr)
+ }
+ }()
+
+ result = callback.Execute(context)
+
+ return result, err
+}
+
+//func (p *Proxy) InvokeCallbacks(callbackType CallbackType, context context.Context, proceedOnError bool) {
+func (p *Proxy) InvokeCallbacks(args ...interface{}) interface{} {
+ callbackType := args[0].(CallbackType)
+ context := args[1]
+ proceedOnError := args[2].(bool)
+
+ var err error
+
+ if _, exists := p.Callbacks[callbackType]; exists {
+ for _, callback := range p.Callbacks[callbackType] {
+ if context, err = p.invoke(callback, context); err != nil {
+ if !proceedOnError {
+ log.Info("An error occurred. Stopping callback invocation")
+ break
+ }
+ log.Info("An error occurred. Invoking next callback")
+ }
+ }
+ }
+ return context
}