[VOL-1550] Support for OLT hard reboot with ONU connected
Change-Id: I67642d847d2308f8abf8e9b90986eeecf65b2a41
diff --git a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
index ea99cf7..226fc3c 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
@@ -154,6 +154,8 @@
if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
} else {
+ log.Debugw("un-marshaled-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "data": data.Interface()})
+
var pathLock string
var pac *proxyAccessControl
var blobs map[string]*kvstore.KVPair
@@ -182,69 +184,64 @@
log.Debugw("operation-in-progress", log.Fields{
"key": pr.GetHash(),
"path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetRoot().GetProxy().Operation,
+ "operation": pr.GetNode().GetProxy().Operation.String(),
})
- continue
+ // Identify the operation type and determine if the watch event should be applied or not.
+ switch pr.GetNode().GetProxy().Operation {
+ case PROXY_REMOVE:
+ fallthrough
- // TODO/FIXME: keep logic for now in case we need to control based on running operation
- //
- // The code below seems to revert the in-memory/persistence value (occasionally) with
- // the one received from the watch event.
- //
- // The same problem may occur, in the scenario where the core owning a device
- // receives a watch event for an update made by another core, and when the owning core is
- // also processing an update. Need to investigate...
- //
- //switch pr.GetNode().GetRoot().GetProxy().Operation {
- //case PROXY_UPDATE:
- // // We will need to reload once the update operation completes.
- // // Therefore, the data of the current event is most likely out-dated
- // // and should be ignored
- // log.Debugw("reload-required", log.Fields{
- // "key": pr.GetHash(),
- // "path": pr.GetNode().GetProxy().getFullPath(),
- // "operation": pr.GetNode().GetRoot().GetProxy().Operation,
- // })
- //
- // // Eliminate the object constructed earlier
- // blobs = nil
- //
- //case PROXY_ADD:
- // fallthrough
- //
- //case PROXY_REMOVE:
- // fallthrough
- //
- //case PROXY_GET:
- // fallthrough
- //
- //default:
- // // No need to process the event ... move on
- // log.Debugw("", log.Fields{
- // "key": pr.GetHash(),
- // "path": pr.GetNode().GetProxy().getFullPath(),
- // "operation": pr.GetNode().GetRoot().GetProxy().Operation,
- // })
- //
- // continue
- //}
+ case PROXY_ADD:
+ fallthrough
+
+ case PROXY_UPDATE:
+ // We will need to reload once the operation completes.
+ // Therefore, the data of the current event is most likely out-dated
+ // and should be ignored
+ log.Debugw("ignore-watch-event", log.Fields{
+ "key": pr.GetHash(),
+ "path": pr.GetNode().GetProxy().getFullPath(),
+ "operation": pr.GetNode().GetProxy().Operation.String(),
+ })
+
+ continue
+
+ case PROXY_CREATE:
+ fallthrough
+
+ case PROXY_LIST:
+ fallthrough
+
+ case PROXY_GET:
+ fallthrough
+
+ default:
+ log.Debugw("process-watch-event", log.Fields{
+ "key": pr.GetHash(),
+ "path": pr.GetNode().GetProxy().getFullPath(),
+ "operation": pr.GetNode().GetProxy().Operation.String(),
+ })
+ }
}
// Reserve the path to prevent others to modify while we reload from persistence
log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
- pac.SetProxy(pr.GetNode().GetProxy())
pac.lock()
+ pr.GetNode().GetProxy().Operation = PROXY_UPDATE
+ pac.SetProxy(pr.GetNode().GetProxy())
// Load changes and apply to memory
pr.LoadFromPersistence(pr.GetName(), "", blobs)
log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ pac.getProxy().Operation = PROXY_GET
pac.unlock()
PAC().ReleasePath(pathLock)
} else {
+ // This block should be reached only if coming from a non-proxied request
log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
// Load changes and apply to memory
@@ -380,42 +377,59 @@
// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
- //rev := pr
+ // Parent which holds the current node entry
+ parent := pr.GetBranch().Node.Root
- children := make([]Revision, len(pr.GetBranch().GetLatest().GetChildren(typeName)))
- copy(children, pr.GetBranch().GetLatest().GetChildren(typeName))
+ // Get a copy of the parent's children
+ children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
+ copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
- // Verify if the revision contains a child that matches that key
- if childIdx, childRev := pr.GetNode().findRevByKey(pr.GetBranch().GetLatest().GetChildren(typeName), keyName,
- keyValue); childRev != nil {
+ // Verify if a child with the provided key value can be found
+ if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
// A child matching the provided key exists in memory
- // Verify if the data differs to what was retrieved from persistence
+ // Verify if the data differs from what was retrieved from persistence
if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
- log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
+ log.Debugw("revision-data-is-different", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
"data": childRev.GetData(),
})
+ //
// Data has changed; replace the child entry and update the parent revision
- childRev.Drop(txid, false)
+ //
+
+ // BEGIN Lock child -- prevent any incoming changes
+ childRev.GetBranch().LatestLock.Lock()
+
+ // Update child
updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+
updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
updatedChildRev.SetupWatch(updatedChildRev.GetName())
+ childRev.Drop(txid, false)
+ updatedChildRev.SetLastUpdate()
- if childIdx >= 0 {
- children[childIdx] = updatedChildRev
- } else {
- children = append(children, updatedChildRev)
- }
+ // Update cache
+ GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
- pr.GetBranch().LatestLock.Lock()
- updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
- pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
- pr.GetBranch().LatestLock.Unlock()
+ childRev.GetBranch().LatestLock.Unlock()
+ // END lock child
+
+ // Update child entry
+ children[childIdx] = updatedChildRev
+
+ // BEGIN lock parent -- Update parent
+ parent.GetBranch(NONE).LatestLock.Lock()
+
+ updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+ parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
+
+ parent.GetBranch(NONE).LatestLock.Unlock()
+ // END lock parent
// Drop the previous child revision
- pr.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+ parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
if updatedChildRev != nil {
log.Debugw("verify-persisted-entry--adding-child", log.Fields{
@@ -427,49 +441,64 @@
}
} else {
// Data is the same. Continue to the next entry
- log.Debugw("verify-persisted-entry--same-data", log.Fields{
+ log.Debugw("same-revision-data", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
"data": childRev.GetData(),
})
if childRev != nil {
- log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
+ log.Debugw("keeping-same-revision-data", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
"data": childRev.GetData(),
})
+
+ // Update timestamp to reflect when it was last read and to reset tracked timeout
+ childRev.SetLastUpdate()
+ GetRevCache().Cache.Store(childRev.GetName(), childRev)
response = childRev
}
}
} else {
// There is no available child with that key value.
// Create a new child and update the parent revision.
- log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
+ log.Debugw("no-such-revision-entry", log.Fields{
"key": keyValue,
"name": typeName,
"data": data,
})
+ // BEGIN child lock
+ pr.GetBranch().LatestLock.Lock()
+
// Construct a new child node with the retrieved persistence data
childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
// We need to start watching this entry for future changes
childRev.SetName(typeName + "/" + keyValue)
-
- // Add the child to the parent revision
- pr.GetBranch().LatestLock.Lock()
- children = append(children, childRev)
- updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
- updatedRev.GetNode().SetProxy(pr.GetNode().GetProxy())
childRev.SetupWatch(childRev.GetName())
- //rev.GetBranch().Node.Latest().Drop(txid, false)
- pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
+ pr.GetBranch().Node.makeLatest(pr.GetBranch(), childRev, nil)
pr.GetBranch().LatestLock.Unlock()
+ // END child lock
+
+ //
+ // Add the child to the parent revision
+ //
+
+ // BEGIN parent lock
+ parent.GetBranch(NONE).LatestLock.Lock()
+ children = append(children, childRev)
+ updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+ updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
+
+ parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
+ parent.GetBranch(NONE).LatestLock.Unlock()
+ // END parent lock
// Child entry is valid and can be included in the response object
if childRev != nil {
- log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ log.Debugw("adding-revision-to-response", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
"data": childRev.GetData(),
@@ -483,7 +512,8 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(
+ path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -516,7 +546,7 @@
field := ChildrenFields(nodeType)[name]
if field != nil && field.IsContainer {
- log.Debugw("load-from-persistence--start-blobs", log.Fields{
+ log.Debugw("parsing-data-blobs", log.Fields{
"path": path,
"name": name,
"size": len(blobs),
@@ -528,13 +558,19 @@
data := reflect.New(field.ClassType.Elem())
if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
+ log.Errorw("failed-to-unmarshal", log.Fields{
"path": path,
"txid": txid,
"error": err,
})
} else if path == "" {
if field.Key != "" {
+ log.Debugw("no-path-with-container-key", log.Fields{
+ "path": path,
+ "txid": txid,
+ "data": data.Interface(),
+ })
+
// Retrieve the key identifier value from the data structure
// based on the field's key attribute
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
@@ -543,9 +579,20 @@
txid); entry != nil {
response = append(response, entry)
}
+ } else {
+ log.Debugw("path-with-no-container-key", log.Fields{
+ "path": path,
+ "txid": txid,
+ "data": data.Interface(),
+ })
}
} else if field.Key != "" {
+ log.Debugw("path-with-container-key", log.Fields{
+ "path": path,
+ "txid": txid,
+ "data": data.Interface(),
+ })
// The request is for a specific entry/id
partition := strings.SplitN(path, "/", 2)
key := partition[0]
@@ -563,10 +610,9 @@
}
}
- log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
+ log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
} else {
- log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
-
+ log.Debugw("cannot-process-field", log.Fields{
"type": pr.GetBranch().Node.Type,
"name": name,
})