VOL-1387 : Added watch mechanism
- Fixed a few failure cases
- Adjusted a few logs
Change-Id: Ied1ecb3d8996a338eee00e9643685482700e860b
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 69db753..dd24e7e 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -21,6 +21,7 @@
"compress/gzip"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
"reflect"
"runtime/debug"
"strings"
@@ -29,10 +30,12 @@
// PersistedRevision holds information of revision meant to be saved in a persistent storage
type PersistedRevision struct {
- mutex sync.RWMutex
Revision
Compress bool
- kvStore *Backend
+
+ events chan *kvstore.Event `json:"-"`
+ kvStore *Backend `json:"-"`
+ mutex sync.RWMutex `json:"-"`
}
// NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -59,9 +62,8 @@
}
if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
- log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
+ log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash()})
return
- //}
}
if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
@@ -76,17 +78,69 @@
}
if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
- log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
- pr.GetHash(),
- pr.GetConfig().Data)
+ log.Warnw("problem-storing-revision-config",
+ log.Fields{"error": err, "hash": pr.GetHash(), "data": pr.GetConfig().Data})
} else {
- log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetHash(), pr.GetConfig().Data,
- string(debug.Stack()))
+ log.Debugw("storing-revision-config",
+ log.Fields{"hash": pr.GetHash(), "data": pr.GetConfig().Data})
}
}
}
+func (pr *PersistedRevision) SetupWatch(key string) {
+ if pr.events == nil {
+ pr.events = make(chan *kvstore.Event)
+
+ log.Debugw("setting-watch", log.Fields{"key": key})
+
+ pr.events = pr.kvStore.CreateWatch(key)
+
+ // Start watching
+ go pr.startWatching()
+ }
+}
+
+func (pr *PersistedRevision) startWatching() {
+ log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
+
+StopWatchLoop:
+ for {
+ select {
+ case event, ok := <-pr.events:
+ if !ok {
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
+ break StopWatchLoop
+ }
+
+ log.Debugw("received-event", log.Fields{"type": event.EventType})
+
+ switch event.EventType {
+ case kvstore.DELETE:
+ log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
+ pr.Revision.Drop("", true)
+ break StopWatchLoop
+
+ case kvstore.PUT:
+ log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
+
+ if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
+ log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ } else {
+ pr.UpdateData(dataPair.Value, pr.GetBranch())
+ }
+
+ default:
+ log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
+ }
+ }
+ }
+
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
+}
+
func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
var response []Revision
var rev Revision
@@ -129,6 +183,10 @@
childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
childRev.SetHash(name + "/" + key.String())
+
+ // Create watch for <component>/<key>
+ pr.SetupWatch(childRev.GetHash())
+
children = append(children, childRev)
rev = rev.UpdateChildren(name, children, rev.GetBranch())
@@ -174,9 +232,10 @@
// UpdateData modifies the information in the data model and saves it in the persistent storage
func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+ log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
+
newNPR := pr.Revision.UpdateData(data, branch)
- log.Debugf("Updating data %+v", data)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
@@ -188,6 +247,8 @@
// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+ log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
+
newNPR := pr.Revision.UpdateChildren(name, children, branch)
newPR := &PersistedRevision{
@@ -201,6 +262,8 @@
// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+ log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
+
newNPR := pr.Revision.UpdateAllChildren(children, branch)
newPR := &PersistedRevision{
@@ -215,29 +278,29 @@
// Drop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
+ log.Debugw("dropping-revision",
+ log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
+
pr.mutex.Lock()
defer pr.mutex.Unlock()
if pr.kvStore != nil && txid == "" {
if includeConfig {
- log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
- log.Errorf(
- "failed to remove rev config - hash: %s, err: %s",
- pr.GetConfig().Hash,
- err.Error(),
- )
+ log.Errorw("failed-to-remove-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "error": err.Error()})
}
}
- log.Debugf("removing rev - hash: %s", pr.GetHash())
if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
- log.Errorf("failed to remove rev - hash: %s, err: %s", pr.GetHash(), err.Error())
+ log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
}
+
+ pr.kvStore.DeleteWatch(pr.GetConfig().Hash, pr.events)
+
} else {
if includeConfig {
- log.Debugf("Attempted to remove revision config:%s linked to transaction:%s", pr.GetConfig().Hash, txid)
+ log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
}
- log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
+ log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
}
pr.Revision.Drop(txid, includeConfig)