VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 2ab91b7..d2d228f 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,7 +19,9 @@
import (
"bytes"
"compress/gzip"
+ "context"
"github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -32,11 +34,13 @@
Revision
Compress bool
- events chan *kvstore.Event
- kvStore *Backend
- mutex sync.RWMutex
- isStored bool
- isWatched bool
+ events chan *kvstore.Event
+ kvStore *Backend
+ mutex sync.RWMutex
+ versionMutex sync.RWMutex
+ Version int64
+ isStored bool
+ isWatched bool
}
type watchCache struct {
@@ -57,10 +61,23 @@
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
pr.kvStore = branch.Node.GetRoot().KvStore
+ pr.Version = 1
pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
return pr
}
+func (pr *PersistedRevision) getVersion() int64 {
+ pr.versionMutex.RLock()
+ defer pr.versionMutex.RUnlock()
+ return pr.Version
+}
+
+func (pr *PersistedRevision) setVersion(version int64) {
+ pr.versionMutex.Lock()
+ defer pr.versionMutex.Unlock()
+ pr.Version = version
+}
+
// Finalize is responsible of saving the revision in the persistent storage
func (pr *PersistedRevision) Finalize(skipOnExist bool) {
pr.store(skipOnExist)
@@ -73,8 +90,12 @@
log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
- if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
- // TODO report error
+ // clone the revision data to avoid any race conditions with processes
+ // accessing the same data
+ cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
+
+ if blob, err := proto.Marshal(cloned); err != nil {
+ log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
} else {
if pr.Compress {
var b bytes.Buffer
@@ -84,10 +105,11 @@
blob = b.Bytes()
}
+ GetRevCache().Set(pr.GetName(), pr)
if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
} else {
- log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+ log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
pr.isStored = true
}
}
@@ -145,6 +167,20 @@
case kvstore.PUT:
log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ if latestRev.getVersion() >= event.Version {
+ log.Debugw("skipping-matching-or-older-revision", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ continue
+ } else {
+ log.Debugw("watch-revision-is-newer", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ }
data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
@@ -154,7 +190,6 @@
log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
var pathLock string
- var pac *proxyAccessControl
var blobs map[string]*kvstore.KVPair
// The watch reported new persistence data.
@@ -166,6 +201,7 @@
Value: event.Value,
Session: "",
Lease: 0,
+ Version: event.Version,
}
if latestRev.GetNode().GetProxy() != nil {
@@ -173,82 +209,35 @@
// If a proxy exists for this revision, use it to lock access to the path
// and prevent simultaneous updates to the object in memory
//
- pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
//If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
- if PAC().IsReserved(pathLock) {
+ if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
log.Debugw("operation-in-progress", log.Fields{
"key": latestRev.GetHash(),
"path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().Operation.String(),
+ "operation": latestRev.GetNode().GetProxy().operation.String(),
})
-
- //continue
-
- // Identify the operation type and determine if the watch event should be applied or not.
- switch latestRev.GetNode().GetProxy().Operation {
- case PROXY_REMOVE:
- fallthrough
-
- case PROXY_ADD:
- fallthrough
-
- case PROXY_UPDATE:
- // We will need to reload once the operation completes.
- // Therefore, the data of the current event is most likely out-dated
- // and should be ignored
- log.Debugw("ignore-watch-event", log.Fields{
- "key": latestRev.GetHash(),
- "path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().Operation.String(),
- })
-
- continue
-
- case PROXY_CREATE:
- fallthrough
-
- case PROXY_LIST:
- fallthrough
-
- case PROXY_GET:
- fallthrough
-
- case PROXY_WATCH:
- fallthrough
-
- default:
- log.Debugw("process-watch-event", log.Fields{
- "key": latestRev.GetHash(),
- "path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().Operation.String(),
- })
- }
+ continue
}
+ pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
+
// Reserve the path to prevent others to modify while we reload from persistence
- log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
- pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
- latestRev.GetNode().GetProxy(), pathLock)
- pac.lock()
- latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
- pac.SetProxy(latestRev.GetNode().GetProxy())
+ latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
// Load changes and apply to memory
- latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
- log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
- pac.getProxy().Operation = PROXY_GET
- pac.unlock()
- PAC().ReleasePath(pathLock)
+ // Release path
+ latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
} else {
// This block should be reached only if coming from a non-proxied request
log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
// Load changes and apply to memory
- latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
}
}
@@ -264,16 +253,17 @@
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
- newNPR := pr.Revision.UpdateData(data, branch)
+ newNPR := pr.Revision.UpdateData(ctx, data, branch)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
events: pr.events,
+ Version: pr.getVersion(),
isWatched: pr.isWatched,
}
@@ -289,17 +279,17 @@
}
// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
- branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
- newNPR := pr.Revision.UpdateChildren(name, children, branch)
+ newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
events: pr.events,
+ Version: pr.getVersion(),
isWatched: pr.isWatched,
}
@@ -324,6 +314,7 @@
Compress: pr.Compress,
kvStore: pr.kvStore,
events: pr.events,
+ Version: pr.getVersion(),
isWatched: pr.isWatched,
}
@@ -346,8 +337,7 @@
// Drop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
- log.Debugw("dropping-revision",
- log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
+ log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -376,9 +366,10 @@
}
// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
-func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
+ keyValue string, txid string, version int64) (response Revision) {
// Parent which holds the current node entry
- parent := pr.GetBranch().Node.Root
+ parent := pr.GetBranch().Node.GetRoot()
// Get a copy of the parent's children
children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
@@ -389,11 +380,12 @@
// A child matching the provided key exists in memory
// Verify if the data differs from what was retrieved from persistence
// Also check if we are treating a newer revision of the data or not
- if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+ if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
log.Debugw("revision-data-is-different", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ "data": childRev.GetData(),
+ "version": childRev.getVersion(),
})
//
@@ -404,14 +396,15 @@
childRev.GetBranch().LatestLock.Lock()
// Update child
- updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+ updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
updatedChildRev.SetupWatch(updatedChildRev.GetName())
updatedChildRev.SetLastUpdate()
+ updatedChildRev.(*PersistedRevision).setVersion(version)
// Update cache
- GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+ GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
childRev.Drop(txid, false)
childRev.GetBranch().LatestLock.Unlock()
@@ -423,7 +416,7 @@
// BEGIN lock parent -- Update parent
parent.GetBranch(NONE).LatestLock.Lock()
- updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+ updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
@@ -441,14 +434,8 @@
response = updatedChildRev
}
} else {
- // Data is the same. Continue to the next entry
- log.Debugw("same-revision-data", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
- })
if childRev != nil {
- log.Debugw("keeping-same-revision-data", log.Fields{
+ log.Debugw("keeping-revision-data", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
"data": childRev.GetData(),
@@ -456,7 +443,10 @@
// Update timestamp to reflect when it was last read and to reset tracked timeout
childRev.SetLastUpdate()
- GetRevCache().Cache.Store(childRev.GetName(), childRev)
+ if childRev.getVersion() < version {
+ childRev.(*PersistedRevision).setVersion(version)
+ }
+ GetRevCache().Set(childRev.GetName(), childRev)
response = childRev
}
}
@@ -479,6 +469,10 @@
// We need to start watching this entry for future changes
childRev.SetName(typeName + "/" + keyValue)
childRev.SetupWatch(childRev.GetName())
+ childRev.(*PersistedRevision).setVersion(version)
+
+ // Add entry to cache
+ GetRevCache().Set(childRev.GetName(), childRev)
pr.GetBranch().LatestLock.Unlock()
// END child lock
@@ -490,7 +484,7 @@
// BEGIN parent lock
parent.GetBranch(NONE).LatestLock.Lock()
children = append(children, childRev)
- updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+ updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
@@ -512,7 +506,7 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -539,7 +533,7 @@
nodeType = pr.GetBranch().Node.Type
} else {
path = partition[1]
- nodeType = pr.GetBranch().Node.Root.Type
+ nodeType = pr.GetBranch().Node.GetRoot().Type
}
field := ChildrenFields(nodeType)[name]
@@ -574,7 +568,7 @@
// based on the field's key attribute
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
response = append(response, entry)
}
} else {
@@ -601,7 +595,7 @@
}
keyValue := field.KeyFromStr(key)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
response = append(response, entry)
}
}