VOL-1900 lint warning fixes db
Change-Id: Iaa4e5c271c9e1d7c8ebce1e13c7e723ea4762304
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index e0fcb10..3214054 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -20,14 +20,15 @@
"bytes"
"compress/gzip"
"context"
+ "reflect"
+ "strings"
+ "sync"
+
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v2/pkg/db"
"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
- "reflect"
- "strings"
- "sync"
)
// PersistedRevision holds information of revision meant to be saved in a persistent storage
@@ -51,7 +52,7 @@
var watchCacheInstance *watchCache
var watchCacheOne sync.Once
-func Watches() *watchCache {
+func watches() *watchCache {
watchCacheOne.Do(func() {
watchCacheInstance = &watchCache{Cache: sync.Map{}}
})
@@ -101,12 +102,14 @@
if pr.Compress {
var b bytes.Buffer
w := gzip.NewWriter(&b)
- w.Write(blob)
+ if _, err := w.Write(blob); err != nil {
+ log.Errorw("Unable to write a compressed form of p to the underlying io.Writer.", log.Fields{"error": err})
+ }
w.Close()
blob = b.Bytes()
}
- GetRevCache().Set(pr.GetName(), pr)
+ getRevCache().Set(pr.GetName(), pr)
if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
} else {
@@ -116,13 +119,14 @@
}
}
+// SetupWatch -
func (pr *PersistedRevision) SetupWatch(key string) {
if key == "" {
log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
return
}
- if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
+ if _, exists := watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
return
}
@@ -151,111 +155,114 @@
StopWatchLoop:
for {
latestRev := pr.GetBranch().GetLatest()
+ event, ok := <-pr.events
+ if !ok {
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ break StopWatchLoop
+ }
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
- select {
- case event, ok := <-pr.events:
- if !ok {
- log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
- break StopWatchLoop
+ switch event.EventType {
+ case kvstore.DELETE:
+ log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+
+ // Remove reference from cache
+ getRevCache().Delete(latestRev.GetName())
+
+ // Remove reference from parent
+ parent := pr.GetBranch().Node.GetRoot()
+ parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
+
+ break StopWatchLoop
+
+ case kvstore.PUT:
+ log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ if latestRev.getVersion() >= event.Version {
+ log.Debugw("skipping-matching-or-older-revision", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ continue
+ } else {
+ log.Debugw("watch-revision-is-newer", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
}
- log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
- switch event.EventType {
- case kvstore.DELETE:
- log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
- // Remove reference from cache
- GetRevCache().Delete(latestRev.GetName())
+ if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
+ log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
+ } else {
+ log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
- // Remove reference from parent
- parent := pr.GetBranch().Node.GetRoot()
- parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
+ var pathLock string
- break StopWatchLoop
-
- case kvstore.PUT:
- log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
- if latestRev.getVersion() >= event.Version {
- log.Debugw("skipping-matching-or-older-revision", log.Fields{
- "watch": latestRev.GetName(),
- "watch-version": event.Version,
- "latest-version": latestRev.getVersion(),
- })
- continue
- } else {
- log.Debugw("watch-revision-is-newer", log.Fields{
- "watch": latestRev.GetName(),
- "watch-version": event.Version,
- "latest-version": latestRev.getVersion(),
- })
+ // The watch reported new persistence data.
+ // Construct an object that will be used to update the memory
+ blobs := make(map[string]*kvstore.KVPair)
+ key, _ := kvstore.ToString(event.Key)
+ blobs[key] = &kvstore.KVPair{
+ Key: key,
+ Value: event.Value,
+ Session: "",
+ Lease: 0,
+ Version: event.Version,
}
- data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
+ if latestRev.getNode().GetProxy() != nil {
+ //
+ // If a proxy exists for this revision, use it to lock access to the path
+ // and prevent simultaneous updates to the object in memory
+ //
- if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
- } else {
- log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
-
- var pathLock string
- var blobs map[string]*kvstore.KVPair
-
- // The watch reported new persistence data.
- // Construct an object that will be used to update the memory
- blobs = make(map[string]*kvstore.KVPair)
- key, _ := kvstore.ToString(event.Key)
- blobs[key] = &kvstore.KVPair{
- Key: key,
- Value: event.Value,
- Session: "",
- Lease: 0,
- Version: event.Version,
+ //If the proxy already has a request in progress, then there is no need to process the watch
+ if latestRev.getNode().GetProxy().GetOperation() != ProxyNone {
+ log.Debugw("operation-in-progress", log.Fields{
+ "key": latestRev.GetHash(),
+ "path": latestRev.getNode().GetProxy().getFullPath(),
+ "operation": latestRev.getNode().GetProxy().operation.String(),
+ })
+ continue
}
- if latestRev.GetNode().GetProxy() != nil {
- //
- // If a proxy exists for this revision, use it to lock access to the path
- // and prevent simultaneous updates to the object in memory
- //
+ pathLock, _ = latestRev.getNode().GetProxy().parseForControlledPath(latestRev.getNode().GetProxy().getFullPath())
- //If the proxy already has a request in progress, then there is no need to process the watch
- if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
- log.Debugw("operation-in-progress", log.Fields{
- "key": latestRev.GetHash(),
- "path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().operation.String(),
- })
- continue
- }
+ // Reserve the path to prevent others to modify while we reload from persistence
+ if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ log.Errorw("Unable to acquire a key and set it to a given value", log.Fields{"error": err})
+ }
+ latestRev.getNode().GetProxy().SetOperation(ProxyWatch)
- pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
+ // Load changes and apply to memory
+ if _, err = latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs); err != nil {
+ log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
+ }
- // Reserve the path to prevent others to modify while we reload from persistence
- latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
- latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
+ // Release path
+ if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(pathLock + "_"); err != nil {
+ log.Errorw("Unable to release reservation for a specific key", log.Fields{"error": err})
+ }
+ } else {
+ // This block should be reached only if coming from a non-proxied request
+ log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
- // Load changes and apply to memory
- latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
-
- // Release path
- latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
-
- } else {
- // This block should be reached only if coming from a non-proxied request
- log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-
- // Load changes and apply to memory
- latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
+ // Load changes and apply to memory
+ if _, err = latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs); err != nil {
+ log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
}
}
-
- default:
- log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
}
+
+ default:
+ log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
}
}
- Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
+ watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
}
@@ -342,7 +349,7 @@
pr.Revision.Drop(txid, includeConfig)
}
-// Drop takes care of eliminating a revision hash that is no longer needed
+// StorageDrop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
@@ -381,7 +388,7 @@
copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
// Verify if a child with the provided key value can be found
- if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
+ if childIdx, childRev := pr.getNode().findRevByKey(children, keyName, keyValue); childRev != nil {
// A child matching the provided key exists in memory
// Verify if the data differs from what was retrieved from persistence
// Also check if we are treating a newer revision of the data or not
@@ -404,13 +411,13 @@
// Update child
updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
- updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
+ updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
updatedChildRev.SetupWatch(updatedChildRev.GetName())
updatedChildRev.SetLastUpdate()
updatedChildRev.(*PersistedRevision).setVersion(version)
// Update cache
- GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
+ getRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
childRev.Drop(txid, false)
childRev.GetBranch().LatestLock.Unlock()
@@ -440,23 +447,21 @@
response = updatedChildRev
}
} else {
- if childRev != nil {
- log.Debugw("keeping-revision-data", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
- "in-memory-version": childRev.getVersion(),
- "persistence-version": version,
- })
+ log.Debugw("keeping-revision-data", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ "data": childRev.GetData(),
+ "in-memory-version": childRev.getVersion(),
+ "persistence-version": version,
+ })
- // Update timestamp to reflect when it was last read and to reset tracked timeout
- childRev.SetLastUpdate()
- if childRev.getVersion() < version {
- childRev.(*PersistedRevision).setVersion(version)
- }
- GetRevCache().Set(childRev.GetName(), childRev)
- response = childRev
+ // Update timestamp to reflect when it was last read and to reset tracked timeout
+ childRev.SetLastUpdate()
+ if childRev.getVersion() < version {
+ childRev.(*PersistedRevision).setVersion(version)
}
+ getRevCache().Set(childRev.GetName(), childRev)
+ response = childRev
}
} else {
@@ -481,7 +486,7 @@
childRev.(*PersistedRevision).setVersion(version)
// Add entry to cache
- GetRevCache().Set(childRev.GetName(), childRev)
+ getRevCache().Set(childRev.GetName(), childRev)
pr.GetBranch().LatestLock.Unlock()
// END child lock
@@ -494,7 +499,7 @@
parent.GetBranch(NONE).LatestLock.Lock()
children = append(children, childRev)
updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
- updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
+ updatedRev.getNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
// END parent lock
@@ -529,7 +534,7 @@
}
if pr.kvStore != nil && path != "" {
- if blobs == nil || len(blobs) == 0 {
+ if len(blobs) == 0 {
log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
if blobs, err = pr.kvStore.List(path); err != nil {