VOL-1176: Ported callback mechanism
- Updated some files with the license block

Change-Id: I61db400b3a72c4915f4f3f17cc7a110313c2d25e
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
index c20eac3..6afc63b 100644
--- a/db/model/event_bus.go
+++ b/db/model/event_bus.go
@@ -44,7 +44,12 @@
 	return bus
 }
 
-func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
+//func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
+func (bus *EventBus) Advertise(args ...interface{}) interface{} {
+	eventType := args[0].(CallbackType)
+	data := args[1]
+	hash := args[2].(string)
+
 	if _, ok := IGNORED_CALLBACKS[eventType]; ok {
 		fmt.Printf("ignoring event - type:%s, data:%+v\n", eventType, data)
 	}
@@ -75,4 +80,6 @@
 	}
 
 	bus.client.Publish(bus.topic, event)
+
+	return nil
 }
diff --git a/db/model/merge.go b/db/model/merge.go
index 8ff89c4..97257b4 100644
--- a/db/model/merge.go
+++ b/db/model/merge.go
@@ -87,7 +87,7 @@
 func Merge3Way(
 	forkRev, srcRev, dstRev Revision,
 	mergeChildFunc func(Revision) Revision,
-	dryRun bool) (rev Revision, changes map[CallbackType][]interface{}) {
+	dryRun bool) (rev Revision, changes []ChangeTuple) {
 
 	var configChanged bool
 
@@ -124,9 +124,9 @@
 						newChildren[fieldName] = append(newChildren[fieldName], mergeChildFunc(rev))
 					}
 					if field.IsContainer {
-						changes[POST_LISTCHANGE] = append(
-							changes[POST_LISTCHANGE],
-							NewOperationContext("", nil, fieldName, ""),
+						changes = append(
+							changes, ChangeTuple{POST_LISTCHANGE,
+							NewOperationContext("", nil, fieldName, "")},
 						)
 					}
 				}
@@ -146,18 +146,11 @@
 					newRev := mergeChildFunc(newList[idx])
 					newList[idx] = newRev
 
-					changes[POST_ADD] = append(
-						changes[POST_ADD],
-						newRev.GetData(),
-					)
+					changes = append(changes, ChangeTuple{POST_ADD,newRev.GetData()})
 				}
 				for key, _ := range src.RemovedKeys {
 					oldRev := forkList[src.KeyMap1[key]]
-
-					changes[POST_REMOVE] = append(
-						changes[POST_REMOVE],
-						oldRev.GetData(),
-					)
+					changes = append(changes, ChangeTuple{POST_REMOVE,oldRev.GetData()})
 				}
 				for key, _ := range src.ChangedKeys {
 					idx := src.KeyMap2[key]
@@ -184,10 +177,7 @@
 					} else {
 						newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
 						newList = append(newList, newRev)
-						changes[POST_ADD] = append(
-							changes[POST_ADD],
-							newRev.GetData(),
-						)
+						changes = append(changes, ChangeTuple{POST_ADD,newRev.GetData()})
 					}
 				}
 				for key, _ := range src.ChangedKeys {
@@ -223,10 +213,7 @@
 						newList[len(newList)-1] = nil
 						newList = newList[:len(newList)-1]
 
-						changes[POST_REMOVE] = append(
-							changes[POST_REMOVE],
-							oldRev.GetData(),
-						)
+						changes = append(changes, ChangeTuple{POST_REMOVE,oldRev.GetData()})
 					}
 				}
 
@@ -246,10 +233,7 @@
 		rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
 
 		if configChanged {
-			changes[POST_UPDATE] = append(
-				changes[POST_UPDATE],
-				rev.GetData(),
-			)
+			changes = append(changes, ChangeTuple{POST_UPDATE,rev.GetData()})
 		}
 		return rev, changes
 	}
diff --git a/db/model/node.go b/db/model/node.go
index 90ab666..77e0cc7 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -37,6 +37,11 @@
 	AutoPrune bool
 }
 
+type ChangeTuple struct {
+	Type CallbackType
+	Data interface{}
+}
+
 func NewNode(root *Root, initialData interface{}, autoPrune bool, txid string) *Node {
 	n := &Node{}
 
@@ -69,7 +74,7 @@
 	return n.root.MakeRevision(branch, data, children)
 }
 
-func (n *Node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement map[CallbackType][]interface{}) {
+func (n *Node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
 	if _, ok := branch.Revisions[revision.GetHash()]; !ok {
 		branch.Revisions[revision.GetHash()] = revision
 	}
@@ -80,15 +85,17 @@
 
 	if changeAnnouncement != nil && branch.Txid == "" {
 		if n.Proxy != nil {
-			for changeType, data := range changeAnnouncement {
+			for _, change := range changeAnnouncement {
 				// TODO: Invoke callback
-				fmt.Printf("invoking callback - changeType: %+v, data:%+v\n", changeType, data)
+				fmt.Printf("invoking callback - changeType: %+v, data:%+v\n", change.Type, change.Data)
+				n.root.addCallback(n.Proxy.InvokeCallbacks, change.Type, change.Data, true)
 			}
 		}
 
-		for changeType, data := range changeAnnouncement {
+		for _, change := range changeAnnouncement {
 			// TODO: send notifications
-			fmt.Printf("sending notification - changeType: %+v, data:%+v\n", changeType, data)
+			fmt.Printf("sending notification - changeType: %+v, data:%+v\n", change.Type, change.Data)
+			n.root.addNotificationCallback(n.makeEventBus().Advertise, change.Type, change.Data, revision.GetHash())
 		}
 	}
 }
@@ -266,8 +273,9 @@
 	msg := rev.Get(depth)
 
 	if n.Proxy != nil {
-		// TODO: invoke GET callback
-		fmt.Println("invoking proxy GET Callbacks")
+		log.Debug("invoking proxy GET Callbacks")
+		msg = n.Proxy.InvokeCallbacks(GET, msg, false)
+
 	}
 	return msg
 }
@@ -373,8 +381,8 @@
 	//}
 
 	if n.Proxy != nil {
-		// TODO: n.proxy.InvokeCallbacks(CallbackType.PRE_UPDATE, data)
-		fmt.Println("invoking proxy PRE_UPDATE Callbacks")
+		log.Debug("invoking proxy PRE_UPDATE Callbacks")
+		n.Proxy.InvokeCallbacks(PRE_UPDATE, data, false)
 	}
 	if !reflect.DeepEqual(branch.Latest.GetData(), data) {
 		if strict {
@@ -428,8 +436,8 @@
 		if path == "" {
 			if field.Key != "" {
 				if n.Proxy != nil {
-					// TODO -> n.proxy.InvokeCallbacks(PRE_ADD, data)
-					fmt.Println("invoking proxy PRE_ADD Callbacks")
+					log.Debug("invoking proxy PRE_ADD Callbacks")
+					n.Proxy.InvokeCallbacks(PRE_ADD, data, false)
 				}
 
 				for _, v := range rev.GetChildren()[name] {
@@ -508,7 +516,7 @@
 
 	field := ChildrenFields(n.Type)[name]
 	var children []Revision
-	post_anno := make(map[CallbackType][]interface{})
+	postAnnouncement := []ChangeTuple{}
 
 	if field.IsContainer {
 		if path == "" {
@@ -542,14 +550,14 @@
 				idx, childRev := n.findRevByKey(children, field.Key, key)
 				if n.Proxy != nil {
 					data := childRev.GetData()
-					fmt.Println("invoking proxy PRE_REMOVE Callbacks")
-					fmt.Printf("setting POST_REMOVE Callbacks : %+v\n", data)
+					n.Proxy.InvokeCallbacks(PRE_REMOVE, data, false)
+					postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, data})
 				} else {
-					fmt.Println("setting POST_REMOVE Callbacks")
+					postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, childRev.GetData()})
 				}
 				children = append(children[:idx], children[idx+1:]...)
 				rev := rev.UpdateChildren(name, children, branch)
-				n.root.MakeLatest(branch, rev, post_anno)
+				n.root.MakeLatest(branch, rev, postAnnouncement)
 				return rev
 			}
 		} else {
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 8085eaf..79087a3 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -16,9 +16,13 @@
 package model
 
 import (
-	"context"
 	"fmt"
 	"strings"
+	"reflect"
+	"crypto/md5"
+	"github.com/opencord/voltha-go/common/log"
+	"errors"
+	"runtime"
 )
 
 type OperationContext struct {
@@ -48,16 +52,17 @@
 	Node      *Node
 	Path      string
 	Exclusive bool
-	Callbacks []interface{}
+	Callbacks map[CallbackType]map[string]CallbackTuple
 }
 
 func NewProxy(root *Root, node *Node, path string, exclusive bool) *Proxy {
+	callbacks := make(map[CallbackType]map[string]CallbackTuple)
 	p := &Proxy{
 		Root:      root,
 		Node:      node,
 		Exclusive: exclusive,
 		Path:      path,
-		Callbacks: []interface{}{},
+		Callbacks: callbacks,
 	}
 	return p
 }
@@ -121,11 +126,81 @@
 	p.Root.DeleteTxBranch(txid)
 }
 
-func (p *Proxy) RegisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+//type CallbackFunction func(context context.Context, args ...interface{})
+type CallbackFunction func(args ...interface{}) interface{}
+type CallbackTuple struct {
+	callback CallbackFunction
+	args     []interface{}
 }
 
-func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+func (tuple *CallbackTuple) Execute(context interface{}) interface{} {
+	newArgs := []interface{}{}
+	if context != nil {
+		newArgs = append(newArgs, context)
+	}
+	newArgs = append(newArgs, tuple.args...)
+	return tuple.callback(newArgs...)
 }
 
-func (p *Proxy) InvokeCallback(callbackType CallbackType, context context.Context, proceedOnError bool) {
+func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+	if _, exists := p.Callbacks[callbackType]; !exists {
+		p.Callbacks[callbackType] = make(map[string]CallbackTuple)
+	}
+	funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+	log.Debugf("value of function: %s", funcName)
+	funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+
+	p.Callbacks[callbackType][funcHash] = CallbackTuple{callback, args}
+}
+
+func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+	if _, exists := p.Callbacks[callbackType]; !exists {
+		log.Errorf("no such callback type - %s", callbackType.String())
+		return
+	}
+	// TODO: Not sure if this is the best way to do it.
+	funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+	log.Debugf("value of function: %s", funcName)
+	funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+	if _, exists := p.Callbacks[callbackType][funcHash]; !exists {
+		log.Errorf("function with hash value: '%s' not registered with callback type: '%s'", funcHash, callbackType)
+		return
+	}
+	delete(p.Callbacks[callbackType], funcHash)
+}
+
+func (p *Proxy) invoke(callback CallbackTuple, context interface{}) (result interface{}, err error) {
+	defer func() {
+		if r := recover(); r != nil {
+			errStr := fmt.Sprintf("callback error occurred: %+v", r)
+			err = errors.New(errStr)
+			log.Error(errStr)
+		}
+	}()
+
+	result = callback.Execute(context)
+
+	return result, err
+}
+
+//func (p *Proxy) InvokeCallbacks(callbackType CallbackType, context context.Context, proceedOnError bool) {
+func (p *Proxy) InvokeCallbacks(args ...interface{}) interface{} {
+	callbackType := args[0].(CallbackType)
+	context := args[1]
+	proceedOnError := args[2].(bool)
+
+	var err error
+
+	if _, exists := p.Callbacks[callbackType]; exists {
+		for _, callback := range p.Callbacks[callbackType] {
+			if context, err = p.invoke(callback, context); err != nil {
+				if !proceedOnError {
+					log.Info("An error occurred.  Stopping callback invocation")
+					break
+				}
+				log.Info("An error occurred.  Invoking next callback")
+			}
+		}
+	}
+	return context
 }
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 1824d3f..0984016 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -24,6 +24,7 @@
 	"github.com/google/uuid"
 	"encoding/hex"
 	"encoding/json"
+	"fmt"
 )
 
 type proxyTest struct {
@@ -46,7 +47,7 @@
 		DbPort:    2379,
 		DbTimeout: 5,
 	}
-	devId string
+	devId          string
 	targetDeviceId string
 )
 
@@ -56,8 +57,6 @@
 	}
 	defer log.CleanUp()
 
-}
-func Test_Proxy_0_GetRootProxy(t *testing.T) {
 	pt.Backend = NewBackend(pt.DbType, pt.DbHost, pt.DbPort, pt.DbTimeout, pt.DbPrefix)
 
 	msgClass := &voltha.Voltha{}
@@ -69,6 +68,18 @@
 	pt.Proxy = pt.Root.Node.GetProxy("/", false)
 }
 
+//func Test_Proxy_0_GetRootProxy(t *testing.T) {
+//	pt.Backend = NewBackend(pt.DbType, pt.DbHost, pt.DbPort, pt.DbTimeout, pt.DbPrefix)
+//
+//	msgClass := &voltha.Voltha{}
+//	root := NewRoot(msgClass, pt.Backend, nil)
+//	pt.Root = root.Load(msgClass)
+//
+//	GetProfiling().Report()
+//
+//	pt.Proxy = pt.Root.Node.GetProxy("/", false)
+//}
+
 func Test_Proxy_1_GetDevices(t *testing.T) {
 	devices := pt.Proxy.Get("/devices", 1, false, "")
 
@@ -83,7 +94,7 @@
 
 func Test_Proxy_2_GetDevice(t *testing.T) {
 	basePath := "/devices/" + targetDeviceId
-	device1 := pt.Proxy.Get(basePath + "/ports", 1, false, "")
+	device1 := pt.Proxy.Get(basePath+"/ports", 1, false, "")
 	t.Logf("retrieved device with ports: %+v", device1)
 
 	device2 := pt.Proxy.Get(basePath, 0, false, "")
@@ -157,10 +168,10 @@
 	devId = "0001" + hex.EncodeToString(devIdBin)[:12]
 
 	device := &voltha.Device{
-		Id:                  devId,
-		Type:                "simulated_olt",
-		Address:             &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
-		AdminState:          voltha.AdminState_PREPROVISIONED,
+		Id:         devId,
+		Type:       "simulated_olt",
+		Address:    &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+		AdminState: voltha.AdminState_PREPROVISIONED,
 	}
 
 	if added := pt.Proxy.Add("/devices", device, ""); added == nil {
@@ -171,7 +182,7 @@
 }
 
 func Test_Proxy_4_CheckAddedDevice(t *testing.T) {
-	if d := pt.Proxy.Get("/devices/" + devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+	if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
 		t.Error("Failed to find added device")
 	} else {
 		djson, _ := json.Marshal(d)
@@ -181,7 +192,7 @@
 }
 
 func Test_Proxy_5_UpdateDevice(t *testing.T) {
-	if retrieved := pt.Proxy.Get("/devices/" + targetDeviceId, 1, false, ""); retrieved == nil {
+	if retrieved := pt.Proxy.Get("/devices/"+targetDeviceId, 1, false, ""); retrieved == nil {
 		t.Error("Failed to get device")
 	} else {
 		var fwVersion int
@@ -196,7 +207,7 @@
 		cloned.FirmwareVersion = strconv.Itoa(fwVersion)
 		t.Logf("Before update : %+v", cloned)
 
-		if afterUpdate := pt.Proxy.Update("/devices/" + targetDeviceId, &cloned, false, ""); afterUpdate == nil {
+		if afterUpdate := pt.Proxy.Update("/devices/"+targetDeviceId, &cloned, false, ""); afterUpdate == nil {
 			t.Error("Failed to update device")
 		} else {
 			t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
@@ -205,13 +216,13 @@
 }
 
 func Test_Proxy_6_CheckUpdatedDevice(t *testing.T) {
-	device := pt.Proxy.Get("/devices/" + targetDeviceId, 0, false, "")
+	device := pt.Proxy.Get("/devices/"+targetDeviceId, 0, false, "")
 
 	t.Logf("content of updated device: %+v", device)
 }
 
 func Test_Proxy_7_RemoveDevice(t *testing.T) {
-	if removed := pt.Proxy.Remove("/devices/" + devId, ""); removed == nil {
+	if removed := pt.Proxy.Remove("/devices/"+devId, ""); removed == nil {
 		t.Error("Failed to remove device")
 	} else {
 		t.Logf("Removed device : %+v", removed)
@@ -219,10 +230,58 @@
 }
 
 func Test_Proxy_8_CheckRemovedDevice(t *testing.T) {
-	if d := pt.Proxy.Get("/devices/" + devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+	if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
 		djson, _ := json.Marshal(d)
 		t.Errorf("Device was not removed - %s", djson)
 	} else {
 		t.Logf("Device was removed: %s", devId)
 	}
 }
+
+// -----------------------------
+// Callback tests
+// -----------------------------
+
+func firstCallback(args ...interface{}) interface{} {
+	name := args[0]
+	id := args[1]
+	fmt.Printf("Running first callback - name: %s, id: %s\n", name, id)
+	return nil
+}
+func secondCallback(args ...interface{}) interface{} {
+	name := args[0].(map[string]string)
+	id := args[1]
+	fmt.Printf("Running second callback - name: %s, id: %f\n", name["name"], id)
+	panic("Generating a panic in second callback")
+	return nil
+}
+func thirdCallback(args ...interface{}) interface{} {
+	name := args[0]
+	id := args[1].(*voltha.Device)
+	fmt.Printf("Running third callback - name: %+v, id: %s\n", name, id.Id)
+	return nil
+}
+
+func Test_Proxy_Callbacks_1_Register(t *testing.T) {
+	pt.Proxy.RegisterCallback(PRE_ADD, firstCallback, "abcde", "12345")
+
+	m := make(map[string]string)
+	m["name"] = "fghij"
+	pt.Proxy.RegisterCallback(PRE_ADD, secondCallback, m, 1.2345)
+
+	d := &voltha.Device{Id: "12345"}
+	pt.Proxy.RegisterCallback(PRE_ADD, thirdCallback, "klmno", d)
+}
+
+func Test_Proxy_Callbacks_2_Invoke_WithNoInterruption(t *testing.T) {
+	pt.Proxy.InvokeCallbacks(PRE_ADD, nil, true)
+}
+func Test_Proxy_Callbacks_3_Invoke_WithInterruption(t *testing.T) {
+	pt.Proxy.InvokeCallbacks(PRE_ADD, nil, false)
+}
+
+func Test_Proxy_Callbacks_4_Unregister(t *testing.T) {
+	pt.Proxy.UnregisterCallback(PRE_ADD, firstCallback)
+	pt.Proxy.UnregisterCallback(PRE_ADD, secondCallback)
+	pt.Proxy.UnregisterCallback(PRE_ADD, thirdCallback)
+}
diff --git a/db/model/root.go b/db/model/root.go
index 5adb99d..d10e9f1 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -31,8 +31,8 @@
 	KvStore               *Backend
 	Loading               bool
 	RevisionClass         interface{}
-	Callbacks             []func()
-	NotificationCallbacks []func()
+	Callbacks             []CallbackTuple
+	NotificationCallbacks []CallbackTuple
 }
 
 func NewRoot(initialData interface{}, kvStore *Backend, revisionClass interface{}) *Root {
@@ -44,8 +44,8 @@
 		revisionClass = reflect.TypeOf(PersistedRevision{})
 	}
 	root.RevisionClass = revisionClass
-	root.Callbacks = []func(){}
-	root.NotificationCallbacks = []func(){}
+	root.Callbacks = []CallbackTuple{}
+	root.NotificationCallbacks = []CallbackTuple{}
 
 	root.Node = NewNode(root, initialData, false, "")
 
@@ -88,12 +88,12 @@
 	for len(r.Callbacks) > 0 {
 		callback := r.Callbacks[0]
 		r.Callbacks = r.Callbacks[1:]
-		callback()
+		callback.Execute(nil)
 	}
 	for len(r.NotificationCallbacks) > 0 {
 		callback := r.NotificationCallbacks[0]
 		r.NotificationCallbacks = r.NotificationCallbacks[1:]
-		callback()
+		callback.Execute(nil)
 	}
 }
 
@@ -101,11 +101,11 @@
 	return len(r.Callbacks) == 0
 }
 
-func (r *Root) addCallback(callback func()) {
-	r.Callbacks = append(r.Callbacks, callback)
+func (r *Root) addCallback(callback CallbackFunction, args ...interface{}) {
+	r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
 }
-func (r *Root) addNotificationCallback(callback func()) {
-	r.NotificationCallbacks = append(r.NotificationCallbacks, callback)
+func (r *Root) addNotificationCallback(callback CallbackFunction, args ...interface{}) {
+	r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
 }
 
 func (r *Root) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) Revision {
@@ -200,7 +200,7 @@
 	return r
 }
 
-func (r *Root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement map[CallbackType][]interface{}) {
+func (r *Root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
 	r.Node.MakeLatest(branch, revision, changeAnnouncement)
 
 	if r.KvStore != nil && branch.Txid == "" {
diff --git a/db/model/utils_test.go b/db/model/utils_test.go
index caf540c..644e6e2 100644
--- a/db/model/utils_test.go
+++ b/db/model/utils_test.go
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package model
 
 import (