VOL-1334 : Fixed concurrency issues
- Semaphores were added at the different layers of the model
- Made the proxy interfaces more robust
- Eliminated problems while retrieving latest data in concurrent mode
Change-Id: I7854105d7effa10e5cb704f5d9917569ab184f84
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index b62c569..3682694 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -23,25 +24,30 @@
"github.com/opencord/voltha-go/common/log"
"io/ioutil"
"reflect"
+ "runtime/debug"
+ "sync"
"time"
)
+// PersistedRevision holds information of revision meant to be saved in a persistent storage
type PersistedRevision struct {
+ mutex sync.RWMutex
Revision
Compress bool
kvStore *Backend
}
+// NewPersistedRevision creates a new instance of a PersistentRevision structure
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
- pr.kvStore = branch.Node.Root.KvStore
- pr.Revision = NewNonPersistedRevision(branch, data, children)
+ pr.kvStore = branch.Node.GetRoot().KvStore
+ pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
pr.Finalize()
return pr
}
+// Finalize is responsible of saving the revision in the persistent storage
func (pr *PersistedRevision) Finalize() {
- //pr.Revision.Finalize()
pr.store()
}
@@ -55,6 +61,7 @@
return
}
if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
+ log.Debugf("Entry already exists - hash:%s, stack: %s", pr.Revision.GetHash(), string(debug.Stack()))
return
}
@@ -84,10 +91,17 @@
w.Close()
blob = b.Bytes()
}
- pr.kvStore.Put(pr.GetHash(), blob)
+ if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
+ log.Warnf("Problem storing revision - error: %s, hash: %s, data: %s", err.Error(), pr.GetHash(),
+ string(blob))
+ } else {
+ log.Debugf("Stored entry - hash:%s, blob: %s, stack: %s", pr.Revision.GetHash(), string(blob),
+ string(debug.Stack()))
+ }
}
}
+// Load retrieves a revision from th persistent storage
func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
blob, _ := kvStore.Get(hash)
@@ -132,13 +146,10 @@
return rev
}
-func (pr *PersistedRevision) assignValue(a, b Revision) Revision {
- a = b
- return a
-}
-
+// storeConfig saves the data associated to a revision in the persistent storage
func (pr *PersistedRevision) storeConfig() {
if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
+ log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
return
}
if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
@@ -151,10 +162,19 @@
w.Close()
blob = b.Bytes()
}
- pr.kvStore.Put(pr.GetConfig().Hash, blob)
+
+ if err := pr.kvStore.Put(pr.GetConfig().Hash, blob); err != nil {
+ log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
+ pr.GetConfig().Hash,
+ pr.GetConfig().Data)
+ } else {
+ log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetConfig().Hash, pr.GetConfig().Data,
+ string(debug.Stack()))
+ }
}
}
+// loadConfig restores the data associated to a revision from the persistent storage
func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
blob, _ := kvStore.Get(hash)
start := time.Now()
@@ -185,9 +205,11 @@
return data.Interface()
}
+// UpdateData modifies the information in the data model and saves it in the persistent storage
func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
newNPR := pr.Revision.UpdateData(data, branch)
+ log.Debugf("Updating data %+v", data)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
@@ -199,6 +221,7 @@
return newPR
}
+// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
newNPR := pr.Revision.UpdateChildren(name, children, branch)
@@ -213,6 +236,7 @@
return newPR
}
+// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
newNPR := pr.Revision.UpdateAllChildren(children, branch)
@@ -230,6 +254,8 @@
// Drop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
if pr.kvStore != nil && txid == "" {
if includeConfig {
log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
@@ -252,4 +278,6 @@
}
log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
}
+
+ pr.Revision.Drop(txid, includeConfig)
}