blob: ea99cf7efac12359fede687355b7e3012445d9ff [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040022 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040023 "github.com/opencord/voltha-go/common/log"
Stephane Barbariee0a4c792019-01-16 11:26:29 -050024 "github.com/opencord/voltha-go/db/kvstore"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "reflect"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050026 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050027 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040028)
29
Stephane Barbariedc5022d2018-11-19 15:21:44 -050030// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040032 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040033 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050034
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040035 events chan *kvstore.Event
36 kvStore *Backend
37 mutex sync.RWMutex
Stephane Barbarie3cb01222019-01-16 17:15:56 -050038 isStored bool
39 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040040}
41
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040042type watchCache struct {
43 Cache sync.Map
44}
45
46var watchCacheInstance *watchCache
47var watchCacheOne sync.Once
48
49func Watches() *watchCache {
50 watchCacheOne.Do(func() {
51 watchCacheInstance = &watchCache{Cache: sync.Map{}}
52 })
53 return watchCacheInstance
54}
55
Stephane Barbariedc5022d2018-11-19 15:21:44 -050056// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040057func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040058 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050059 pr.kvStore = branch.Node.GetRoot().KvStore
60 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040061 return pr
62}
63
Stephane Barbariedc5022d2018-11-19 15:21:44 -050064// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050065func (pr *PersistedRevision) Finalize(skipOnExist bool) {
66 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040067}
68
Stephane Barbarie1ab43272018-12-08 21:42:13 -050069func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040070 if pr.GetBranch().Txid != "" {
71 return
72 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040073
Stephane Barbarie7512fc82019-05-07 12:25:46 -040074 log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie1ab43272018-12-08 21:42:13 -050075
Stephane Barbarieec0919b2018-09-05 14:14:29 -040076 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040077 // TODO report error
78 } else {
79 if pr.Compress {
80 var b bytes.Buffer
81 w := gzip.NewWriter(&b)
82 w.Write(blob)
83 w.Close()
84 blob = b.Bytes()
85 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050086
Stephane Barbarief7fc1782019-03-28 22:33:41 -040087 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040088 log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050089 } else {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040090 log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbarie3cb01222019-01-16 17:15:56 -050091 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -050092 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040093 }
94}
95
Stephane Barbariee0a4c792019-01-16 11:26:29 -050096func (pr *PersistedRevision) SetupWatch(key string) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040097 if key == "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040098 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040099 return
100 }
101
102 if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
103 return
104 }
105
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500106 if pr.events == nil {
107 pr.events = make(chan *kvstore.Event)
108
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400109 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500110
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400111 pr.SetName(key)
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500112 pr.events = pr.kvStore.CreateWatch(key)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400113 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500114
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400115 if !pr.isWatched {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500116 pr.isWatched = true
117
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400118 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400119
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500120 // Start watching
121 go pr.startWatching()
122 }
123}
124
125func (pr *PersistedRevision) startWatching() {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400126 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500127
128StopWatchLoop:
129 for {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400130 if pr.IsDiscarded() {
131 break StopWatchLoop
132 }
133
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500134 select {
135 case event, ok := <-pr.events:
136 if !ok {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400137 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500138 break StopWatchLoop
139 }
140
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400141 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500142
143 switch event.EventType {
144 case kvstore.DELETE:
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400145 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500146 pr.Revision.Drop("", true)
147 break StopWatchLoop
148
149 case kvstore.PUT:
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400150 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500151
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400152 data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500153
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400154 if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
155 log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
156 } else {
157 var pathLock string
158 var pac *proxyAccessControl
159 var blobs map[string]*kvstore.KVPair
160
161 // The watch reported new persistence data.
162 // Construct an object that will be used to update the memory
163 blobs = make(map[string]*kvstore.KVPair)
164 key, _ := kvstore.ToString(event.Key)
165 blobs[key] = &kvstore.KVPair{
166 Key: key,
167 Value: event.Value,
168 Session: "",
169 Lease: 0,
170 }
171
172 if pr.GetNode().GetProxy() != nil {
173 //
174 // If a proxy exists for this revision, use it to lock access to the path
175 // and prevent simultaneous updates to the object in memory
176 //
177 pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
178
179 //If the proxy already has a request in progress, then there is no need to process the watch
180 log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
181 if PAC().IsReserved(pathLock) {
182 log.Debugw("operation-in-progress", log.Fields{
183 "key": pr.GetHash(),
184 "path": pr.GetNode().GetProxy().getFullPath(),
185 "operation": pr.GetNode().GetRoot().GetProxy().Operation,
186 })
187
188 continue
189
190 // TODO/FIXME: keep logic for now in case we need to control based on running operation
191 //
192 // The code below seems to revert the in-memory/persistence value (occasionally) with
193 // the one received from the watch event.
194 //
195 // The same problem may occur, in the scenario where the core owning a device
196 // receives a watch event for an update made by another core, and when the owning core is
197 // also processing an update. Need to investigate...
198 //
199 //switch pr.GetNode().GetRoot().GetProxy().Operation {
200 //case PROXY_UPDATE:
201 // // We will need to reload once the update operation completes.
202 // // Therefore, the data of the current event is most likely out-dated
203 // // and should be ignored
204 // log.Debugw("reload-required", log.Fields{
205 // "key": pr.GetHash(),
206 // "path": pr.GetNode().GetProxy().getFullPath(),
207 // "operation": pr.GetNode().GetRoot().GetProxy().Operation,
208 // })
209 //
210 // // Eliminate the object constructed earlier
211 // blobs = nil
212 //
213 //case PROXY_ADD:
214 // fallthrough
215 //
216 //case PROXY_REMOVE:
217 // fallthrough
218 //
219 //case PROXY_GET:
220 // fallthrough
221 //
222 //default:
223 // // No need to process the event ... move on
224 // log.Debugw("", log.Fields{
225 // "key": pr.GetHash(),
226 // "path": pr.GetNode().GetProxy().getFullPath(),
227 // "operation": pr.GetNode().GetRoot().GetProxy().Operation,
228 // })
229 //
230 // continue
231 //}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400232 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400233
234 // Reserve the path to prevent others to modify while we reload from persistence
235 log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
236 pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
237 pac.SetProxy(pr.GetNode().GetProxy())
238 pac.lock()
239
240 // Load changes and apply to memory
241 pr.LoadFromPersistence(pr.GetName(), "", blobs)
242
243 log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
244 pac.unlock()
245 PAC().ReleasePath(pathLock)
246
247 } else {
248 log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
249
250 // Load changes and apply to memory
251 pr.LoadFromPersistence(pr.GetName(), "", blobs)
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500252 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500253 }
254
255 default:
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400256 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500257 }
258 }
259 }
260
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400261 Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500262
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400263 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400264}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400265
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500266// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400267func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500268 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
269
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400270 newNPR := pr.Revision.UpdateData(data, branch)
271
272 newPR := &PersistedRevision{
273 Revision: newNPR,
274 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400275 kvStore: pr.kvStore,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400276 events: pr.events,
277 }
278
279 if newPR.GetHash() != pr.GetHash() {
280 newPR.isWatched = false
281 newPR.isStored = false
282 pr.Drop(branch.Txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400283 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400284 } else {
285 newPR.isWatched = true
286 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400287 }
288
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400289 return newPR
290}
291
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500292// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400293func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
294 branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500295 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
296
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400297 newNPR := pr.Revision.UpdateChildren(name, children, branch)
298
299 newPR := &PersistedRevision{
300 Revision: newNPR,
301 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400302 kvStore: pr.kvStore,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400303 events: pr.events,
304 }
305
306 if newPR.GetHash() != pr.GetHash() {
307 newPR.isWatched = false
308 newPR.isStored = false
309 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400310 } else {
311 newPR.isWatched = true
312 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400313 }
314
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400315 return newPR
316}
317
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500318// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400319func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500320 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
321
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400322 newNPR := pr.Revision.UpdateAllChildren(children, branch)
323
324 newPR := &PersistedRevision{
325 Revision: newNPR,
326 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400327 kvStore: pr.kvStore,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400328 events: pr.events,
329 }
330
331 if newPR.GetHash() != pr.GetHash() {
332 newPR.isWatched = false
333 newPR.isStored = false
334 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400335 } else {
336 newPR.isWatched = true
337 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400338 }
339
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400340 return newPR
341}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400342
343// Drop takes care of eliminating a revision hash that is no longer needed
344// and its associated config when required
345func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400346 pr.Revision.Drop(txid, includeConfig)
347}
348
349// Drop takes care of eliminating a revision hash that is no longer needed
350// and its associated config when required
351func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500352 log.Debugw("dropping-revision",
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400353 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500354
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500355 pr.mutex.Lock()
356 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400357 if pr.kvStore != nil && txid == "" {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500358 if pr.isStored {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400359 if pr.isWatched {
360 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
361 pr.isWatched = false
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500362 }
363
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400364 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500365 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
366 } else {
367 pr.isStored = false
368 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400369 }
370
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400371 } else {
372 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500373 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400374 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500375 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400376 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500377
378 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400379}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400380
381// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
382func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400383 //rev := pr
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400384
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400385 children := make([]Revision, len(pr.GetBranch().GetLatest().GetChildren(typeName)))
386 copy(children, pr.GetBranch().GetLatest().GetChildren(typeName))
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400387
388 // Verify if the revision contains a child that matches that key
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400389 if childIdx, childRev := pr.GetNode().findRevByKey(pr.GetBranch().GetLatest().GetChildren(typeName), keyName,
390 keyValue); childRev != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400391 // A child matching the provided key exists in memory
392 // Verify if the data differs to what was retrieved from persistence
393 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
394 log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
395 "key": childRev.GetHash(),
396 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400397 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400398 })
399
400 // Data has changed; replace the child entry and update the parent revision
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400401 childRev.Drop(txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400402 updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
403 updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
404 updatedChildRev.SetupWatch(updatedChildRev.GetName())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400405
406 if childIdx >= 0 {
407 children[childIdx] = updatedChildRev
408 } else {
409 children = append(children, updatedChildRev)
410 }
411
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400412 pr.GetBranch().LatestLock.Lock()
413 updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
414 pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
415 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400416
417 // Drop the previous child revision
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400418 pr.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400419
420 if updatedChildRev != nil {
421 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
422 "key": updatedChildRev.GetHash(),
423 "name": updatedChildRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400424 "data": updatedChildRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400425 })
426 response = updatedChildRev
427 }
428 } else {
429 // Data is the same. Continue to the next entry
430 log.Debugw("verify-persisted-entry--same-data", log.Fields{
431 "key": childRev.GetHash(),
432 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400433 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400434 })
435 if childRev != nil {
436 log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
437 "key": childRev.GetHash(),
438 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400439 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400440 })
441 response = childRev
442 }
443 }
444 } else {
445 // There is no available child with that key value.
446 // Create a new child and update the parent revision.
447 log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
448 "key": keyValue,
449 "name": typeName,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400450 "data": data,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400451 })
452
453 // Construct a new child node with the retrieved persistence data
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400454 childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400455
456 // We need to start watching this entry for future changes
457 childRev.SetName(typeName + "/" + keyValue)
458
459 // Add the child to the parent revision
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400460 pr.GetBranch().LatestLock.Lock()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400461 children = append(children, childRev)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400462 updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
463 updatedRev.GetNode().SetProxy(pr.GetNode().GetProxy())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400464 childRev.SetupWatch(childRev.GetName())
465
466 //rev.GetBranch().Node.Latest().Drop(txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400467 pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
468 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400469
470 // Child entry is valid and can be included in the response object
471 if childRev != nil {
472 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
473 "key": childRev.GetHash(),
474 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400475 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400476 })
477 response = childRev
478 }
479 }
480
481 return response
482}
483
484// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
485// by adding missing entries, updating changed entries and ignoring unchanged ones
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400486func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400487 pr.mutex.Lock()
488 defer pr.mutex.Unlock()
489
490 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
491
492 var response []Revision
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400493
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400494 for strings.HasPrefix(path, "/") {
495 path = path[1:]
496 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400497
498 if pr.kvStore != nil && path != "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400499 if blobs == nil || len(blobs) == 0 {
500 log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
501 blobs, _ = pr.kvStore.List(path)
502 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400503
504 partition := strings.SplitN(path, "/", 2)
505 name := partition[0]
506
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400507 var nodeType interface{}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400508 if len(partition) < 2 {
509 path = ""
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400510 nodeType = pr.GetBranch().Node.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400511 } else {
512 path = partition[1]
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400513 nodeType = pr.GetBranch().Node.Root.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400514 }
515
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400516 field := ChildrenFields(nodeType)[name]
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400517
518 if field != nil && field.IsContainer {
519 log.Debugw("load-from-persistence--start-blobs", log.Fields{
520 "path": path,
521 "name": name,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400522 "size": len(blobs),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400523 })
524
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400525 for _, blob := range blobs {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400526 output := blob.Value.([]byte)
527
528 data := reflect.New(field.ClassType.Elem())
529
530 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
531 log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
532 "path": path,
533 "txid": txid,
534 "error": err,
535 })
536 } else if path == "" {
537 if field.Key != "" {
538 // Retrieve the key identifier value from the data structure
539 // based on the field's key attribute
540 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
541
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400542 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
543 txid); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400544 response = append(response, entry)
545 }
546 }
547
548 } else if field.Key != "" {
549 // The request is for a specific entry/id
550 partition := strings.SplitN(path, "/", 2)
551 key := partition[0]
552 if len(partition) < 2 {
553 path = ""
554 } else {
555 path = partition[1]
556 }
557 keyValue := field.KeyFromStr(key)
558
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400559 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
560 txid); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400561 response = append(response, entry)
562 }
563 }
564 }
565
566 log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
567 } else {
568 log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400569
570 "type": pr.GetBranch().Node.Type,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400571 "name": name,
572 })
573 }
574 }
575
576 return response
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400577}