blob: a56b776a34928393c6580510ca8d5a44fd6da7d2 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040022 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040023 "github.com/opencord/voltha-go/common/log"
Stephane Barbariee0a4c792019-01-16 11:26:29 -050024 "github.com/opencord/voltha-go/db/kvstore"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "reflect"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050026 "runtime/debug"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050027 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050028 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040029)
30
Stephane Barbariedc5022d2018-11-19 15:21:44 -050031// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040032type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040033 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040034 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050035
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040036 events chan *kvstore.Event
37 kvStore *Backend
38 mutex sync.RWMutex
Stephane Barbarie3cb01222019-01-16 17:15:56 -050039 isStored bool
40 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040041}
42
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040043type watchCache struct {
44 Cache sync.Map
45}
46
47var watchCacheInstance *watchCache
48var watchCacheOne sync.Once
49
50func Watches() *watchCache {
51 watchCacheOne.Do(func() {
52 watchCacheInstance = &watchCache{Cache: sync.Map{}}
53 })
54 return watchCacheInstance
55}
56
Stephane Barbariedc5022d2018-11-19 15:21:44 -050057// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040058func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040059 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050060 pr.kvStore = branch.Node.GetRoot().KvStore
61 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040062 return pr
63}
64
Stephane Barbariedc5022d2018-11-19 15:21:44 -050065// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050066func (pr *PersistedRevision) Finalize(skipOnExist bool) {
67 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040068}
69
Stephane Barbarie1ab43272018-12-08 21:42:13 -050070func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040071 if pr.GetBranch().Txid != "" {
72 return
73 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040074
Stephane Barbarief7fc1782019-03-28 22:33:41 -040075 if pair, _ := pr.kvStore.Get(pr.GetName()); pair != nil && skipOnExist {
76 log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash(), "name": pr.GetName()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040077 return
78 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -050079
Stephane Barbarieec0919b2018-09-05 14:14:29 -040080 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040081 // TODO report error
82 } else {
83 if pr.Compress {
84 var b bytes.Buffer
85 w := gzip.NewWriter(&b)
86 w.Write(blob)
87 w.Close()
88 blob = b.Bytes()
89 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050090
Stephane Barbarief7fc1782019-03-28 22:33:41 -040091 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050092 log.Warnw("problem-storing-revision-config",
Stephane Barbarief7fc1782019-03-28 22:33:41 -040093 log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050094 } else {
Stephane Barbarief7fc1782019-03-28 22:33:41 -040095 log.Debugw("storing-revision-config", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbarie3cb01222019-01-16 17:15:56 -050096 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -050097 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040098 }
99}
100
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500101func (pr *PersistedRevision) SetupWatch(key string) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400102 if key == "" {
103 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
104 return
105 }
106
107 if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
108 return
109 }
110
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500111 if pr.events == nil {
112 pr.events = make(chan *kvstore.Event)
113
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400114 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500115
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400116 pr.SetName(key)
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500117 pr.events = pr.kvStore.CreateWatch(key)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400118 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500119
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400120 if !pr.isWatched {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500121 pr.isWatched = true
122
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400123 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
124
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500125 // Start watching
126 go pr.startWatching()
127 }
128}
129
130func (pr *PersistedRevision) startWatching() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400131 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500132
133StopWatchLoop:
134 for {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400135 if pr.IsDiscarded() {
136 break StopWatchLoop
137 }
138
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500139 select {
140 case event, ok := <-pr.events:
141 if !ok {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400142 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500143 break StopWatchLoop
144 }
145
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400146 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500147
148 switch event.EventType {
149 case kvstore.DELETE:
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400150 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500151 pr.Revision.Drop("", true)
152 break StopWatchLoop
153
154 case kvstore.PUT:
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400155 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500156
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400157 if dataPair, err := pr.kvStore.Get(pr.GetName()); err != nil || dataPair == nil {
158 log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500159 } else {
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500160 data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
161
162 if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400163 log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500164 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400165 if pr.GetNode().GetProxy() != nil {
166 pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
167 }
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500168 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500169 }
170
171 default:
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400172 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500173 }
174 }
175 }
176
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400177 Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500178
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400179 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400180}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400181
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500182// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400183func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500184 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
185
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400186 newNPR := pr.Revision.UpdateData(data, branch)
187
188 newPR := &PersistedRevision{
189 Revision: newNPR,
190 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400191 kvStore: pr.kvStore,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400192 events: pr.events,
193 }
194
195 if newPR.GetHash() != pr.GetHash() {
196 newPR.isWatched = false
197 newPR.isStored = false
198 pr.Drop(branch.Txid, false)
199 newPR.SetupWatch(newPR.GetName())
200 } else {
201 newPR.isWatched = true
202 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400203 }
204
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400205 return newPR
206}
207
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500208// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400209func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500210 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
211
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400212 newNPR := pr.Revision.UpdateChildren(name, children, branch)
213
214 newPR := &PersistedRevision{
215 Revision: newNPR,
216 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400217 kvStore: pr.kvStore,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400218 events: pr.events,
219 }
220
221 if newPR.GetHash() != pr.GetHash() {
222 newPR.isWatched = false
223 newPR.isStored = false
224 pr.Drop(branch.Txid, false)
225 newPR.SetupWatch(newPR.GetName())
226 } else {
227 newPR.isWatched = true
228 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400229 }
230
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400231 return newPR
232}
233
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500234// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400235func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500236 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
237
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400238 newNPR := pr.Revision.UpdateAllChildren(children, branch)
239
240 newPR := &PersistedRevision{
241 Revision: newNPR,
242 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400243 kvStore: pr.kvStore,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400244 events: pr.events,
245 }
246
247 if newPR.GetHash() != pr.GetHash() {
248 newPR.isWatched = false
249 newPR.isStored = false
250 pr.Drop(branch.Txid, false)
251 newPR.SetupWatch(newPR.GetName())
252 } else {
253 newPR.isWatched = true
254 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400255 }
256
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400257 return newPR
258}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400259
260// Drop takes care of eliminating a revision hash that is no longer needed
261// and its associated config when required
262func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400263 pr.Revision.Drop(txid, includeConfig)
264}
265
266// Drop takes care of eliminating a revision hash that is no longer needed
267// and its associated config when required
268func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500269 log.Debugw("dropping-revision",
270 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
271
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500272 pr.mutex.Lock()
273 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400274 if pr.kvStore != nil && txid == "" {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500275 if pr.isStored {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400276 if pr.isWatched {
277 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
278 pr.isWatched = false
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500279 }
280
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400281 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500282 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
283 } else {
284 pr.isStored = false
285 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400286 }
287
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400288 } else {
289 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500290 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400291 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500292 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400293 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500294
295 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400296}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400297
298// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
299func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
300 rev := pr
301
302 children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
303 copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
304
305 // Verify if the revision contains a child that matches that key
306 if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
307 // A child matching the provided key exists in memory
308 // Verify if the data differs to what was retrieved from persistence
309 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
310 log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
311 "key": childRev.GetHash(),
312 "name": childRev.GetName(),
313 })
314
315 // Data has changed; replace the child entry and update the parent revision
316 updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
317 updatedChildRev.SetupWatch(updatedChildRev.GetName())
318 childRev.Drop(txid, false)
319
320 if childIdx >= 0 {
321 children[childIdx] = updatedChildRev
322 } else {
323 children = append(children, updatedChildRev)
324 }
325
326 rev.GetBranch().LatestLock.Lock()
327 updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
328 rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
329 rev.GetBranch().LatestLock.Unlock()
330
331 // Drop the previous child revision
332 rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
333
334 if updatedChildRev != nil {
335 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
336 "key": updatedChildRev.GetHash(),
337 "name": updatedChildRev.GetName(),
338 })
339 response = updatedChildRev
340 }
341 } else {
342 // Data is the same. Continue to the next entry
343 log.Debugw("verify-persisted-entry--same-data", log.Fields{
344 "key": childRev.GetHash(),
345 "name": childRev.GetName(),
346 })
347 if childRev != nil {
348 log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
349 "key": childRev.GetHash(),
350 "name": childRev.GetName(),
351 })
352 response = childRev
353 }
354 }
355 } else {
356 // There is no available child with that key value.
357 // Create a new child and update the parent revision.
358 log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
359 "key": keyValue,
360 "name": typeName,
361 })
362
363 // Construct a new child node with the retrieved persistence data
364 childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
365
366 // We need to start watching this entry for future changes
367 childRev.SetName(typeName + "/" + keyValue)
368
369 // Add the child to the parent revision
370 rev.GetBranch().LatestLock.Lock()
371 children = append(children, childRev)
372 updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
373 childRev.SetupWatch(childRev.GetName())
374
375 //rev.GetBranch().Node.Latest().Drop(txid, false)
376 rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
377 rev.GetBranch().LatestLock.Unlock()
378
379 // Child entry is valid and can be included in the response object
380 if childRev != nil {
381 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
382 "key": childRev.GetHash(),
383 "name": childRev.GetName(),
384 })
385 response = childRev
386 }
387 }
388
389 return response
390}
391
392// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
393// by adding missing entries, updating changed entries and ignoring unchanged ones
394func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
395 pr.mutex.Lock()
396 defer pr.mutex.Unlock()
397
398 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
399
400 var response []Revision
401 var rev Revision
402
403 rev = pr
404
405 if pr.kvStore != nil && path != "" {
406 blobMap, _ := pr.kvStore.List(path)
407
408 partition := strings.SplitN(path, "/", 2)
409 name := partition[0]
410
411 if len(partition) < 2 {
412 path = ""
413 } else {
414 path = partition[1]
415 }
416
417 field := ChildrenFields(rev.GetBranch().Node.Type)[name]
418
419 if field != nil && field.IsContainer {
420 log.Debugw("load-from-persistence--start-blobs", log.Fields{
421 "path": path,
422 "name": name,
423 "size": len(blobMap),
424 })
425
426 for _, blob := range blobMap {
427 output := blob.Value.([]byte)
428
429 data := reflect.New(field.ClassType.Elem())
430
431 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
432 log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
433 "path": path,
434 "txid": txid,
435 "error": err,
436 })
437 } else if path == "" {
438 if field.Key != "" {
439 // Retrieve the key identifier value from the data structure
440 // based on the field's key attribute
441 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
442
443 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
444 response = append(response, entry)
445 }
446 }
447
448 } else if field.Key != "" {
449 // The request is for a specific entry/id
450 partition := strings.SplitN(path, "/", 2)
451 key := partition[0]
452 if len(partition) < 2 {
453 path = ""
454 } else {
455 path = partition[1]
456 }
457 keyValue := field.KeyFromStr(key)
458
459 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
460 response = append(response, entry)
461 }
462 }
463 }
464
465 log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
466 } else {
467 log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
468 "type": rev.GetBranch().Node.Type,
469 "name": name,
470 })
471 }
472 }
473
474 return response
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400475}