blob: 15e438c48e2a87dbe3c68de2ed8c307e833d741b [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040022 "context"
npujar9a30c702019-11-14 17:06:39 +053023 "reflect"
24 "strings"
25 "sync"
26
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027 "github.com/golang/protobuf/proto"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080028 "github.com/opencord/voltha-lib-go/v3/pkg/db"
29 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
30 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031)
32
Stephane Barbariedc5022d2018-11-19 15:21:44 -050033// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040034type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040035 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040036 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050037
Stephane Barbarieef6650d2019-07-18 12:15:09 -040038 events chan *kvstore.Event
sbarbari17d7e222019-11-05 10:02:29 -050039 kvStore *db.Backend
Stephane Barbarieef6650d2019-07-18 12:15:09 -040040 mutex sync.RWMutex
41 versionMutex sync.RWMutex
42 Version int64
43 isStored bool
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040044}
45
Stephane Barbariedc5022d2018-11-19 15:21:44 -050046// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040047func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040048 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050049 pr.kvStore = branch.Node.GetRoot().KvStore
Stephane Barbarieef6650d2019-07-18 12:15:09 -040050 pr.Version = 1
Stephane Barbariedc5022d2018-11-19 15:21:44 -050051 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040052 return pr
53}
54
Stephane Barbarieef6650d2019-07-18 12:15:09 -040055func (pr *PersistedRevision) getVersion() int64 {
56 pr.versionMutex.RLock()
57 defer pr.versionMutex.RUnlock()
58 return pr.Version
59}
60
61func (pr *PersistedRevision) setVersion(version int64) {
62 pr.versionMutex.Lock()
63 defer pr.versionMutex.Unlock()
64 pr.Version = version
65}
66
Stephane Barbariedc5022d2018-11-19 15:21:44 -050067// Finalize is responsible of saving the revision in the persistent storage
npujar467fe752020-01-16 20:17:45 +053068func (pr *PersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
69 pr.store(ctx, skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040070}
71
npujar467fe752020-01-16 20:17:45 +053072func (pr *PersistedRevision) store(ctx context.Context, skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040073 if pr.GetBranch().Txid != "" {
74 return
75 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040076
Stephane Barbarie7512fc82019-05-07 12:25:46 -040077 log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie1ab43272018-12-08 21:42:13 -050078
Stephane Barbarieef6650d2019-07-18 12:15:09 -040079 // clone the revision data to avoid any race conditions with processes
80 // accessing the same data
81 cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
82
83 if blob, err := proto.Marshal(cloned); err != nil {
84 log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040085 } else {
86 if pr.Compress {
87 var b bytes.Buffer
88 w := gzip.NewWriter(&b)
npujar9a30c702019-11-14 17:06:39 +053089 if _, err := w.Write(blob); err != nil {
90 log.Errorw("Unable to write a compressed form of p to the underlying io.Writer.", log.Fields{"error": err})
91 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040092 w.Close()
93 blob = b.Bytes()
94 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050095
npujar9a30c702019-11-14 17:06:39 +053096 getRevCache().Set(pr.GetName(), pr)
npujar467fe752020-01-16 20:17:45 +053097 if err := pr.kvStore.Put(ctx, pr.GetName(), blob); err != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040098 log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050099 } else {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400100 log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500101 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500102 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400103 }
104}
105
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500106// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400107func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500108 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
109
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400110 newNPR := pr.Revision.UpdateData(ctx, data, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400111
112 newPR := &PersistedRevision{
khenaidoo67b22152020-03-02 16:01:25 -0500113 Revision: newNPR,
114 Compress: pr.Compress,
115 kvStore: pr.kvStore,
116 events: pr.events,
117 Version: pr.getVersion(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400118 }
119
120 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400121 newPR.isStored = false
122 pr.Drop(branch.Txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400123 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400124 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400125 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400126 }
127
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400128 return newPR
129}
130
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500131// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400132func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500133 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
134
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400135 newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400136
137 newPR := &PersistedRevision{
khenaidoo67b22152020-03-02 16:01:25 -0500138 Revision: newNPR,
139 Compress: pr.Compress,
140 kvStore: pr.kvStore,
141 events: pr.events,
142 Version: pr.getVersion(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400143 }
144
145 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400146 newPR.isStored = false
147 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400148 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400149 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400150 }
151
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400152 return newPR
153}
154
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500155// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
npujar467fe752020-01-16 20:17:45 +0530156func (pr *PersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500157 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
158
npujar467fe752020-01-16 20:17:45 +0530159 newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400160
161 newPR := &PersistedRevision{
khenaidoo67b22152020-03-02 16:01:25 -0500162 Revision: newNPR,
163 Compress: pr.Compress,
164 kvStore: pr.kvStore,
165 events: pr.events,
166 Version: pr.getVersion(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400167 }
168
169 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400170 newPR.isStored = false
171 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400172 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400173 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400174 }
175
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400176 return newPR
177}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400178
179// Drop takes care of eliminating a revision hash that is no longer needed
180// and its associated config when required
181func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400182 pr.Revision.Drop(txid, includeConfig)
183}
184
npujar9a30c702019-11-14 17:06:39 +0530185// StorageDrop takes care of eliminating a revision hash that is no longer needed
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400186// and its associated config when required
npujar467fe752020-01-16 20:17:45 +0530187func (pr *PersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
khenaidoo49085352020-01-13 19:15:43 -0500188 log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500189
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500190 pr.mutex.Lock()
191 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400192 if pr.kvStore != nil && txid == "" {
npujar467fe752020-01-16 20:17:45 +0530193 if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
khenaidoo49085352020-01-13 19:15:43 -0500194 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
195 } else {
196 pr.isStored = false
197 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400198 } else {
199 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500200 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400201 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500202 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400203 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500204
205 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400206}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400207
208// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400209func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
210 keyValue string, txid string, version int64) (response Revision) {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400211 // Parent which holds the current node entry
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400212 parent := pr.GetBranch().Node.GetRoot()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400213
Stephane Barbarie802aca42019-05-21 12:19:28 -0400214 // Get a copy of the parent's children
215 children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
216 copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400217
Stephane Barbarie802aca42019-05-21 12:19:28 -0400218 // Verify if a child with the provided key value can be found
npujar9a30c702019-11-14 17:06:39 +0530219 if childIdx, childRev := pr.getNode().findRevByKey(children, keyName, keyValue); childRev != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400220 // A child matching the provided key exists in memory
Stephane Barbarie802aca42019-05-21 12:19:28 -0400221 // Verify if the data differs from what was retrieved from persistence
Stephane Barbariec92d1072019-06-07 16:21:49 -0400222 // Also check if we are treating a newer revision of the data or not
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400223 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400224 log.Debugw("revision-data-is-different", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000225 "key": childRev.GetHash(),
226 "name": childRev.GetName(),
227 "data": childRev.GetData(),
228 "in-memory-version": childRev.getVersion(),
229 "persisted-version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400230 })
231
Stephane Barbarie802aca42019-05-21 12:19:28 -0400232 //
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400233 // Data has changed; replace the child entry and update the parent revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400234 //
235
236 // BEGIN Lock child -- prevent any incoming changes
237 childRev.GetBranch().LatestLock.Lock()
238
239 // Update child
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400240 updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400241
npujar9a30c702019-11-14 17:06:39 +0530242 updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400243 updatedChildRev.SetLastUpdate()
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400244 updatedChildRev.(*PersistedRevision).setVersion(version)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400245
Stephane Barbarie802aca42019-05-21 12:19:28 -0400246 // Update cache
npujar9a30c702019-11-14 17:06:39 +0530247 getRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
Stephane Barbariec92d1072019-06-07 16:21:49 -0400248 childRev.Drop(txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400249
Stephane Barbarie802aca42019-05-21 12:19:28 -0400250 childRev.GetBranch().LatestLock.Unlock()
251 // END lock child
252
253 // Update child entry
254 children[childIdx] = updatedChildRev
255
256 // BEGIN lock parent -- Update parent
257 parent.GetBranch(NONE).LatestLock.Lock()
258
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400259 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
Stephane Barbarie802aca42019-05-21 12:19:28 -0400260 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
261
262 parent.GetBranch(NONE).LatestLock.Unlock()
263 // END lock parent
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400264
265 // Drop the previous child revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400266 parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400267
268 if updatedChildRev != nil {
269 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
270 "key": updatedChildRev.GetHash(),
271 "name": updatedChildRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400272 "data": updatedChildRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400273 })
274 response = updatedChildRev
275 }
276 } else {
npujar9a30c702019-11-14 17:06:39 +0530277 log.Debugw("keeping-revision-data", log.Fields{
278 "key": childRev.GetHash(),
279 "name": childRev.GetName(),
280 "data": childRev.GetData(),
281 "in-memory-version": childRev.getVersion(),
282 "persistence-version": version,
283 })
Stephane Barbarie802aca42019-05-21 12:19:28 -0400284
npujar9a30c702019-11-14 17:06:39 +0530285 // Update timestamp to reflect when it was last read and to reset tracked timeout
286 childRev.SetLastUpdate()
287 if childRev.getVersion() < version {
288 childRev.(*PersistedRevision).setVersion(version)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400289 }
npujar9a30c702019-11-14 17:06:39 +0530290 getRevCache().Set(childRev.GetName(), childRev)
291 response = childRev
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400292 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400293
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400294 } else {
295 // There is no available child with that key value.
296 // Create a new child and update the parent revision.
Stephane Barbarie802aca42019-05-21 12:19:28 -0400297 log.Debugw("no-such-revision-entry", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000298 "key": keyValue,
299 "name": typeName,
300 "data": data,
301 "version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400302 })
303
Stephane Barbarie802aca42019-05-21 12:19:28 -0400304 // BEGIN child lock
305 pr.GetBranch().LatestLock.Lock()
306
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400307 // Construct a new child node with the retrieved persistence data
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400308 childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400309
310 // We need to start watching this entry for future changes
311 childRev.SetName(typeName + "/" + keyValue)
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400312 childRev.(*PersistedRevision).setVersion(version)
313
314 // Add entry to cache
npujar9a30c702019-11-14 17:06:39 +0530315 getRevCache().Set(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400316
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400317 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie802aca42019-05-21 12:19:28 -0400318 // END child lock
319
320 //
321 // Add the child to the parent revision
322 //
323
324 // BEGIN parent lock
325 parent.GetBranch(NONE).LatestLock.Lock()
326 children = append(children, childRev)
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400327 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
npujar9a30c702019-11-14 17:06:39 +0530328 updatedRev.getNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400329 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
330 parent.GetBranch(NONE).LatestLock.Unlock()
331 // END parent lock
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400332
333 // Child entry is valid and can be included in the response object
334 if childRev != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400335 log.Debugw("adding-revision-to-response", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400336 "key": childRev.GetHash(),
337 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400338 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400339 })
340 response = childRev
341 }
342 }
343
344 return response
345}
346
347// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
348// by adding missing entries, updating changed entries and ignoring unchanged ones
Thomas Lee Se5a44012019-11-07 20:32:24 +0530349func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400350 pr.mutex.Lock()
351 defer pr.mutex.Unlock()
352
353 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
354
355 var response []Revision
Thomas Lee Se5a44012019-11-07 20:32:24 +0530356 var err error
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400357
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400358 for strings.HasPrefix(path, "/") {
359 path = path[1:]
360 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400361
362 if pr.kvStore != nil && path != "" {
npujar9a30c702019-11-14 17:06:39 +0530363 if len(blobs) == 0 {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400364 log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530365
npujar467fe752020-01-16 20:17:45 +0530366 if blobs, err = pr.kvStore.List(ctx, path); err != nil {
Thomas Lee Se5a44012019-11-07 20:32:24 +0530367 log.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
368 return nil, err
369 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400370 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400371
372 partition := strings.SplitN(path, "/", 2)
373 name := partition[0]
374
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400375 var nodeType interface{}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400376 if len(partition) < 2 {
377 path = ""
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400378 nodeType = pr.GetBranch().Node.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400379 } else {
380 path = partition[1]
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400381 nodeType = pr.GetBranch().Node.GetRoot().Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400382 }
383
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400384 field := ChildrenFields(nodeType)[name]
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400385
386 if field != nil && field.IsContainer {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400387 log.Debugw("parsing-data-blobs", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400388 "path": path,
389 "name": name,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400390 "size": len(blobs),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400391 })
392
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400393 for _, blob := range blobs {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400394 output := blob.Value.([]byte)
395
396 data := reflect.New(field.ClassType.Elem())
397
398 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400399 log.Errorw("failed-to-unmarshal", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400400 "path": path,
401 "txid": txid,
402 "error": err,
403 })
404 } else if path == "" {
405 if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400406 log.Debugw("no-path-with-container-key", log.Fields{
407 "path": path,
408 "txid": txid,
409 "data": data.Interface(),
410 })
411
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400412 // Retrieve the key identifier value from the data structure
413 // based on the field's key attribute
414 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
415
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400416 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400417 response = append(response, entry)
418 }
Stephane Barbarie802aca42019-05-21 12:19:28 -0400419 } else {
420 log.Debugw("path-with-no-container-key", log.Fields{
421 "path": path,
422 "txid": txid,
423 "data": data.Interface(),
424 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400425 }
426
427 } else if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400428 log.Debugw("path-with-container-key", log.Fields{
429 "path": path,
430 "txid": txid,
431 "data": data.Interface(),
432 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400433 // The request is for a specific entry/id
434 partition := strings.SplitN(path, "/", 2)
435 key := partition[0]
436 if len(partition) < 2 {
437 path = ""
438 } else {
439 path = partition[1]
440 }
441 keyValue := field.KeyFromStr(key)
442
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400443 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400444 response = append(response, entry)
445 }
446 }
447 }
448
Stephane Barbarie802aca42019-05-21 12:19:28 -0400449 log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400450 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400451 log.Debugw("cannot-process-field", log.Fields{
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400452 "type": pr.GetBranch().Node.Type,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400453 "name": name,
454 })
455 }
456 }
457
Thomas Lee Se5a44012019-11-07 20:32:24 +0530458 return response, nil
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400459}